global refactor
parent
8617a105e2
commit
3ca717df26
|
|
@ -0,0 +1,214 @@
|
|||
use std::fs;
|
||||
use crate::structs::*;
|
||||
use log::{error, info, warn};
|
||||
use redis::{Client, Commands, Connection};
|
||||
use tokio::time::Duration;
|
||||
|
||||
static CONFIG_PATH : &'static str = "settings.json";
|
||||
// 4ever sync
|
||||
fn load_processes(json_filename: &str) -> Option<Processes>{
|
||||
match fs::read_to_string(json_filename) {
|
||||
Ok(res) => {
|
||||
match serde_json::from_str::<Processes>(&res) {
|
||||
Ok(conf) => {
|
||||
return Some(conf);
|
||||
},
|
||||
Err(_) => {
|
||||
return None;
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
return None;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_actual_config() -> Option<Processes> {
|
||||
|
||||
// todo: local check Some|None -> redis check
|
||||
// * if no conf -> loop and +inf getting conf from redis server
|
||||
// let mut local = load_processes(&CONFIG_PATH);
|
||||
match load_processes(&CONFIG_PATH) {
|
||||
Some(local_conf) => {
|
||||
if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") {
|
||||
match config_comparing(&local_conf, &remote_conf) {
|
||||
ConfigActuality::Local => {
|
||||
info!("Local config is actual");
|
||||
return Some(local_conf);
|
||||
},
|
||||
ConfigActuality::Remote => {
|
||||
info!("Pulled config is more actual. Saving changes!");
|
||||
if save_new_config(&remote_conf, &CONFIG_PATH).is_err() {
|
||||
error!("Saving changes process failed due to unexpected error...")
|
||||
}
|
||||
return Some(remote_conf);
|
||||
},
|
||||
}
|
||||
}
|
||||
return Some(local_conf);
|
||||
},
|
||||
None => {
|
||||
// ? ? OUTSTANDING CONSTRUCTION ?
|
||||
let mut conn = get_connection_watcher(&open_watcher("redis://localhost"));
|
||||
get_stream_info_watcher(&mut conn);
|
||||
let remote_config = invalid_config_watcher(&mut conn);
|
||||
let _ = save_new_config(&remote_config, &CONFIG_PATH);
|
||||
Some(remote_config)
|
||||
},
|
||||
}
|
||||
}
|
||||
// ! once iter exec
|
||||
// ! only for situation when local isnt None (no need to fck redis server)
|
||||
fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
|
||||
match redis::Client::open(serv_info) {
|
||||
Ok(client) => {
|
||||
match client.get_connection() {
|
||||
Ok(mut conn) => {
|
||||
if let Ok(len) = conn.xlen::<&str, usize>("config_stream") {
|
||||
if len == 0 {
|
||||
warn!("No configuration in DB yet");
|
||||
return None;
|
||||
} else {
|
||||
let conf: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>> = conn.xrevrange_count("config_stream", "+", "-", 1);
|
||||
let config: &Vec<(String, Vec<(String, String)>)>;
|
||||
|
||||
if conf.is_ok() {
|
||||
// guarranted safe unwrapping
|
||||
let conf = conf.unwrap();
|
||||
config = &conf[0];
|
||||
if config.is_empty() {
|
||||
error!("Empty config was pulled. Check stream and configs state!");
|
||||
return None;
|
||||
}
|
||||
match parse_extern_config(&config[0].1[0].1) {
|
||||
Some(prcs) => return Some(prcs),
|
||||
None => {
|
||||
error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!");
|
||||
return None;
|
||||
},
|
||||
}
|
||||
} else {
|
||||
error!("Configuration pulling from Redis stream failed. Check stream state!");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("Cannot find config_stream. Check Redis-stream accessibility!");
|
||||
return None;
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Redis connection attempt is failed. Check Redis configuration!");
|
||||
return None;
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Redis-Client opening attempt is failed. Check network configuration!");
|
||||
return None;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ! watchers
|
||||
|
||||
fn open_watcher(serv_info: &str) -> redis::Client {
|
||||
loop {
|
||||
match redis::Client::open(serv_info) {
|
||||
Ok(redis) => {
|
||||
info!("Succesfully opened Redis-Client");
|
||||
return redis
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Redis-Client opening attempt is failed. Check network configuration! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_connection_watcher(client: &Client) -> Connection {
|
||||
loop {
|
||||
match client.get_connection() {
|
||||
Ok(conn) => {
|
||||
info!("Succesfully got Redis connection object");
|
||||
return conn;
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Redis connection attempt is failed. Check Redis configuration! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fn get_stream_info_watcher(conn: &mut Connection) {
|
||||
loop {
|
||||
if let Ok(val) = conn.xlen::<&str, usize>("config_stream") {
|
||||
if val != 0 {
|
||||
info!("Redis stream is able and not empty now");
|
||||
return;
|
||||
}
|
||||
}
|
||||
error!("Configuration pulling from Redis stream failed. Check stream state! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
fn invalid_config_watcher(conn: &mut Connection) -> Processes {
|
||||
// let res: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>>;
|
||||
loop {
|
||||
let res: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>> = conn.xrevrange_count("config_stream", "+", "-", 1);
|
||||
if res.is_ok() {
|
||||
let config = &res.unwrap()[0];
|
||||
if !config.is_empty() {
|
||||
if let Some(conf) = parse_extern_config(&config[0].1[0].1) {
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("Got INVALID configuration. Update config! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
|
||||
// ! end of watchers
|
||||
|
||||
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
||||
let local_date: u64 = local.date_of_creation.parse().unwrap();
|
||||
let remote_date: u64 = remote.date_of_creation.parse().unwrap();
|
||||
|
||||
match local_date.cmp(&remote_date) {
|
||||
std::cmp::Ordering::Equal |
|
||||
std::cmp::Ordering::Greater => return ConfigActuality::Local,
|
||||
std::cmp::Ordering::Less => return ConfigActuality::Remote,
|
||||
}
|
||||
}
|
||||
|
||||
// ! TEMPORARLY DEPRICATED !
|
||||
// fn native_date_from_milis(mls: &str) -> Option<chrono::DateTime<Utc>> {
|
||||
// match mls.parse::<i64>(){
|
||||
// Ok(val) => return chrono::DateTime::from_timestamp_millis(val),
|
||||
// Err(_) => return None,
|
||||
// }
|
||||
// }
|
||||
|
||||
fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> {
|
||||
match serde_json::to_string_pretty(&config) {
|
||||
Ok(st) => {
|
||||
match fs::write(config_file, st) {
|
||||
Ok(_) => return Ok(()),
|
||||
Err(_) => return Err(CustomError::Fatal),
|
||||
}
|
||||
},
|
||||
Err(_) => return Err(CustomError::Fatal),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_extern_config(json_string: &str) -> Option<Processes> {
|
||||
let des = serde_json::from_str::<Processes>(json_string);
|
||||
if des.is_err() {
|
||||
return None;
|
||||
} else {
|
||||
return Some(des.unwrap());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,129 @@
|
|||
use crate::structs::{Files, CustomError};
|
||||
use inotify::{ EventMask, Inotify, WatchMask };
|
||||
use log::error;
|
||||
use tokio::sync::mpsc;
|
||||
use crate::prcs::{is_frozen, is_active};
|
||||
use tokio::time::Duration;
|
||||
use std::sync::Arc;
|
||||
use std::path::Path;
|
||||
use std::borrow::BorrowMut;
|
||||
|
||||
pub async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {
|
||||
let src = format!("{}{}", path, filename);
|
||||
let mut inotify = Inotify::init().unwrap_or_else(|_| {
|
||||
error!("{}",format!("Cannot create watcher for {}", &src));
|
||||
std::process::exit(101);
|
||||
});
|
||||
_ = inotify
|
||||
.watches()
|
||||
.add(
|
||||
&src,
|
||||
WatchMask::ALL_EVENTS
|
||||
);
|
||||
|
||||
Ok(inotify)
|
||||
}
|
||||
|
||||
pub async fn file_handler
|
||||
(
|
||||
name: &str,
|
||||
files: &Vec<Files>,
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>
|
||||
) -> Result<(), CustomError>
|
||||
{
|
||||
// println!("file daemon on {}", name);
|
||||
for (i, file) in files.iter().enumerate() {
|
||||
// let src = format!("{}{}", file.src, file.filename);
|
||||
if check_file(&file.filename, &file.src).await.is_err() {
|
||||
if !is_active(name).await || is_frozen(name).await {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
match file.triggers.on_delete.as_str() {
|
||||
"stay" => {
|
||||
continue;
|
||||
},
|
||||
"stop" => {
|
||||
if is_active(name).await {
|
||||
tx.send(1).await.unwrap();
|
||||
}
|
||||
return Err(CustomError::Fatal);
|
||||
},
|
||||
"hold" => {
|
||||
if is_active(name).await {
|
||||
tx.send(2).await.unwrap();
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
tx.send(101).await.unwrap();
|
||||
return Err(CustomError::Fatal);
|
||||
},
|
||||
}
|
||||
} else if is_active(name).await && !is_frozen(name).await{
|
||||
let watchers = watchers.clone();
|
||||
// println!("mutex: {:?}", watchers);
|
||||
let mut buffer = [0; 128];
|
||||
let mut mutex_guard = watchers.lock().await;
|
||||
if let Some(notify) = mutex_guard.get_mut(i) {
|
||||
let events = notify.read_events(&mut buffer);
|
||||
// println!("{:?}", events);
|
||||
if events.is_ok(){
|
||||
let events: Vec<EventMask> = events.unwrap()
|
||||
.into_iter()
|
||||
.map(|mask| {mask.mask})
|
||||
.filter(|mask| {
|
||||
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
||||
})
|
||||
.collect();
|
||||
for event in events {
|
||||
if let EventMask::DELETE_SELF = event {
|
||||
// ! warning (DELETE_SELF event) !
|
||||
// println!("! warning (DELETE_SELF event) !");
|
||||
// * watcher recreation after dealing with file recreation mech in text editors
|
||||
let mutex = notify.borrow_mut();
|
||||
|
||||
*mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
||||
}
|
||||
match file.triggers.on_change.as_str() {
|
||||
"stop" => {
|
||||
let _ = tx.send(7).await;
|
||||
},
|
||||
"restart" => {
|
||||
let _ = tx.send(8).await;
|
||||
},
|
||||
"stay" => {
|
||||
let _ = tx.send(9).await;
|
||||
},
|
||||
_ => {
|
||||
let _ = tx.send(101).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
|
||||
let arc_name = Arc::new(filename.to_string());
|
||||
let arc_path = Arc::new(path.to_string());
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let fileconcat = format!("{}{}", arc_path, arc_name);
|
||||
let path = Path::new(&fileconcat);
|
||||
if path.exists() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(CustomError::Fatal)
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| {
|
||||
panic!("Corrupted while file check process");
|
||||
})
|
||||
}
|
||||
777
src/main.rs
777
src/main.rs
|
|
@ -1,109 +1,23 @@
|
|||
use redis::{Client, Commands, Connection};
|
||||
// json parsing
|
||||
use serde::{ Deserialize, Serialize };
|
||||
use serde_json;
|
||||
// async multi-threaded execution
|
||||
use tokio::time::{ Duration, Instant };
|
||||
mod structs;
|
||||
mod config;
|
||||
mod files;
|
||||
mod prcs;
|
||||
mod utils;
|
||||
mod services;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::join;
|
||||
// fatal errors handler
|
||||
use core::panic;
|
||||
use std::borrow::BorrowMut;
|
||||
// utils
|
||||
use std::fmt::Debug;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::process::{ Command, Output };
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
// file change handler
|
||||
use inotify::{ EventMask, Inotify, WatchMask };
|
||||
// logging
|
||||
use std::io::Write;
|
||||
use chrono::Local;
|
||||
use env_logger::Builder;
|
||||
use log::{error, info, warn, LevelFilter};
|
||||
use std::net::{TcpStream, ToSocketAddrs};
|
||||
use log::{error, LevelFilter};
|
||||
use structs::*;
|
||||
use config::*;
|
||||
use utils::*;
|
||||
|
||||
static CONFIG_PATH : &'static str = "settings.json";
|
||||
static GET_ID_CMD : &'static str = "cat /proc/self/mountinfo | grep '/docker/containers/' | head -1 | awk -F '/' '{print $5}'";
|
||||
|
||||
/// # an Error enum (nextly will be deleted and replaced)
|
||||
enum CustomError {
|
||||
Fatal,
|
||||
}
|
||||
enum ConfigActuality {
|
||||
Local,
|
||||
Remote,
|
||||
}
|
||||
|
||||
/// # struct for the 1st level in json conf file
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct Processes {
|
||||
// #[serde(rename="id")]
|
||||
// runner_id: usize,
|
||||
#[serde(rename="dateOfCreation")]
|
||||
date_of_creation : String,
|
||||
#[serde(default)]
|
||||
processes : Vec<TrackingProcess>,
|
||||
}
|
||||
|
||||
/// # struct for each process to contain info, such as name, path and dependencies
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct TrackingProcess {
|
||||
name : String,
|
||||
path : String,
|
||||
dependencies: Dependencies,
|
||||
}
|
||||
|
||||
/// # struct for processes' dependecies including files and services
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct Dependencies {
|
||||
#[serde(default)]
|
||||
files : Vec<Files>,
|
||||
#[serde(default)]
|
||||
services: Vec<Services>,
|
||||
}
|
||||
|
||||
/// # struct for containing file object with its triggers to manipulate in daemons
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct Files {
|
||||
filename : String,
|
||||
src : String,
|
||||
triggers : FIleTriggers,
|
||||
}
|
||||
|
||||
/// # struct for containing service object with its triggers to manipulate in daemons
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct Services {
|
||||
hostname : String,
|
||||
port : u32,
|
||||
triggers : ServiceTriggers,
|
||||
}
|
||||
|
||||
/// # struct for instancing each service's policies such as on lost or time to wait till reachable
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct ServiceTriggers {
|
||||
wait : u32,
|
||||
delay: u32,
|
||||
#[serde(rename="onLost")]
|
||||
on_lost : String,
|
||||
}
|
||||
|
||||
/// # struct for instancing each file's policies such as ondelete or onupdate events
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct FIleTriggers {
|
||||
#[serde(rename="onDelete")]
|
||||
on_delete : String,
|
||||
#[serde(rename="onChange")]
|
||||
on_change : String,
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() {
|
||||
|
|
@ -165,673 +79,6 @@ async fn main() {
|
|||
return;
|
||||
}
|
||||
|
||||
fn get_actual_config() -> Option<Processes> {
|
||||
|
||||
// todo: local check Some|None -> redis check
|
||||
// * if no conf -> loop and +inf getting conf from redis server
|
||||
// let mut local = load_processes(&CONFIG_PATH);
|
||||
match load_processes(&CONFIG_PATH) {
|
||||
Some(local_conf) => {
|
||||
if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") {
|
||||
match config_comparing(&local_conf, &remote_conf) {
|
||||
ConfigActuality::Local => {
|
||||
info!("Local config is actual");
|
||||
return Some(local_conf);
|
||||
},
|
||||
ConfigActuality::Remote => {
|
||||
info!("Pulled config is more actual. Saving changes!");
|
||||
if save_new_config(&remote_conf, &CONFIG_PATH).is_err() {
|
||||
error!("Saving changes process failed due to unexpected error...")
|
||||
}
|
||||
return Some(remote_conf);
|
||||
},
|
||||
}
|
||||
}
|
||||
return Some(local_conf);
|
||||
},
|
||||
None => {
|
||||
// ? ? OUTSTANDING CONSTRUCTION ?
|
||||
let mut conn = get_connection_watcher(&open_watcher("redis://localhost"));
|
||||
get_stream_info_watcher(&mut conn);
|
||||
let remote_config = invalid_config_watcher(&mut conn);
|
||||
let _ = save_new_config(&remote_config, &CONFIG_PATH);
|
||||
Some(remote_config)
|
||||
},
|
||||
}
|
||||
}
|
||||
// ! once iter exec
|
||||
// ! only for situation when local isnt None (no need to fck redis server)
|
||||
fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
|
||||
match redis::Client::open(serv_info) {
|
||||
Ok(client) => {
|
||||
match client.get_connection() {
|
||||
Ok(mut conn) => {
|
||||
if let Ok(len) = conn.xlen::<&str, usize>("config_stream") {
|
||||
if len == 0 {
|
||||
warn!("No configuration in DB yet");
|
||||
return None;
|
||||
} else {
|
||||
let conf: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>> = conn.xrevrange_count("config_stream", "+", "-", 1);
|
||||
let config: &Vec<(String, Vec<(String, String)>)>;
|
||||
|
||||
if conf.is_ok() {
|
||||
// guarranted safe unwrapping
|
||||
let conf = conf.unwrap();
|
||||
config = &conf[0];
|
||||
if config.is_empty() {
|
||||
error!("Empty config was pulled. Check stream and configs state!");
|
||||
return None;
|
||||
}
|
||||
match parse_extern_config(&config[0].1[0].1) {
|
||||
Some(prcs) => return Some(prcs),
|
||||
None => {
|
||||
error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!");
|
||||
return None;
|
||||
},
|
||||
}
|
||||
} else {
|
||||
error!("Configuration pulling from Redis stream failed. Check stream state!");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("Cannot find config_stream. Check Redis-stream accessibility!");
|
||||
return None;
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Redis connection attempt is failed. Check Redis configuration!");
|
||||
return None;
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Redis-Client opening attempt is failed. Check network configuration!");
|
||||
return None;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ! watchers
|
||||
|
||||
fn open_watcher(serv_info: &str) -> redis::Client {
|
||||
loop {
|
||||
match redis::Client::open(serv_info) {
|
||||
Ok(redis) => {
|
||||
info!("Succesfully opened Redis-Client");
|
||||
return redis
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Redis-Client opening attempt is failed. Check network configuration! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_connection_watcher(client: &Client) -> Connection {
|
||||
loop {
|
||||
match client.get_connection() {
|
||||
Ok(conn) => {
|
||||
info!("Succesfully got Redis connection object");
|
||||
return conn;
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Redis connection attempt is failed. Check Redis configuration! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fn get_stream_info_watcher(conn: &mut Connection) {
|
||||
loop {
|
||||
if let Ok(val) = conn.xlen::<&str, usize>("config_stream") {
|
||||
if val != 0 {
|
||||
info!("Redis stream is able and not empty now");
|
||||
return;
|
||||
}
|
||||
}
|
||||
error!("Configuration pulling from Redis stream failed. Check stream state! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
fn invalid_config_watcher(conn: &mut Connection) -> Processes {
|
||||
// let res: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>>;
|
||||
loop {
|
||||
let res: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>> = conn.xrevrange_count("config_stream", "+", "-", 1);
|
||||
if res.is_ok() {
|
||||
let config = &res.unwrap()[0];
|
||||
if !config.is_empty() {
|
||||
if let Some(conf) = parse_extern_config(&config[0].1[0].1) {
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("Got INVALID configuration. Update config! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
|
||||
// ! end of watchers
|
||||
|
||||
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
||||
let local_date: u64 = local.date_of_creation.parse().unwrap();
|
||||
let remote_date: u64 = remote.date_of_creation.parse().unwrap();
|
||||
|
||||
match local_date.cmp(&remote_date) {
|
||||
std::cmp::Ordering::Equal |
|
||||
std::cmp::Ordering::Greater => return ConfigActuality::Local,
|
||||
std::cmp::Ordering::Less => return ConfigActuality::Remote,
|
||||
}
|
||||
}
|
||||
|
||||
// ! TEMPORARLY DEPRICATED !
|
||||
// fn native_date_from_milis(mls: &str) -> Option<chrono::DateTime<Utc>> {
|
||||
// match mls.parse::<i64>(){
|
||||
// Ok(val) => return chrono::DateTime::from_timestamp_millis(val),
|
||||
// Err(_) => return None,
|
||||
// }
|
||||
// }
|
||||
|
||||
fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> {
|
||||
match serde_json::to_string_pretty(&config) {
|
||||
Ok(st) => {
|
||||
match fs::write(config_file, st) {
|
||||
Ok(_) => return Ok(()),
|
||||
Err(_) => return Err(CustomError::Fatal),
|
||||
}
|
||||
},
|
||||
Err(_) => return Err(CustomError::Fatal),
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {
|
||||
let src = format!("{}{}", path, filename);
|
||||
let mut inotify = Inotify::init().unwrap_or_else(|_| {
|
||||
error!("{}",format!("Cannot create watcher for {}", &src));
|
||||
std::process::exit(101);
|
||||
});
|
||||
_ = inotify
|
||||
.watches()
|
||||
.add(
|
||||
&src,
|
||||
WatchMask::ALL_EVENTS
|
||||
);
|
||||
|
||||
Ok(inotify)
|
||||
}
|
||||
|
||||
/// # async func to run 3 main daemons (now its more like tree-form than classiacl 0.1.0 form )
|
||||
/// > hint : give mpsc with capacity 1 to jump over potential errors during running process
|
||||
/// ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") **
|
||||
async fn run_daemons(
|
||||
proc: Arc<TrackingProcess>,
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
rx: &mut mpsc::Receiver<u8>
|
||||
)
|
||||
{
|
||||
// creating watchers + ---buffers---
|
||||
let mut watchers: Vec<Inotify> = vec![];
|
||||
for file in proc.dependencies.files.clone().into_iter() {
|
||||
watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
|
||||
}
|
||||
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> = Arc::new(tokio::sync::Mutex::new(watchers));
|
||||
|
||||
loop {
|
||||
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
||||
tokio::select! {
|
||||
_ = run_hand => {},
|
||||
_val = rx.recv() => {
|
||||
match _val.unwrap() {
|
||||
// 1 - File-dependency handling error -> terminating (after waiting)
|
||||
1 => {
|
||||
if is_active(&proc.name).await {
|
||||
error!("Dependency handling error: Terminating {} process ..." , &proc.name);
|
||||
terminate_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
// break;
|
||||
},
|
||||
// 2 - File-dependency handling error -> holding (after waiting)
|
||||
2 => {
|
||||
if !is_frozen(&proc.name).await {
|
||||
error!("Dependency handling error: Freezing {} process ..." , &proc.name);
|
||||
freeze_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
},
|
||||
// 3 - Running process error
|
||||
3 => {
|
||||
error!("Error due to starting {} process", &proc.name);
|
||||
},
|
||||
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
|
||||
4 => {
|
||||
warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name);
|
||||
},
|
||||
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
|
||||
5 => {
|
||||
if is_active(&proc.name).await {
|
||||
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
|
||||
terminate_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
// break;
|
||||
},
|
||||
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
|
||||
6 => {
|
||||
if !is_frozen(&proc.name).await {
|
||||
error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
|
||||
freeze_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
},
|
||||
// // 7 - File-dependency change -> terminating (after check)
|
||||
7 => {
|
||||
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
|
||||
terminate_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
},
|
||||
// // 8 - File-dependency change -> restarting (after check)
|
||||
8 => {
|
||||
warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name);
|
||||
let _ = restart_process(&proc.name, &proc.path).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
},
|
||||
// // 9 - File-dependency change -> staying (after check)
|
||||
9 => {
|
||||
warn!("File-dependency warning (file changed). Ignoring on {} process...", &proc.name);
|
||||
},
|
||||
|
||||
// 10 - Process unfreaze call via file handler
|
||||
10 => {
|
||||
if is_frozen(&proc.name).await {
|
||||
warn!("Unfreezing process {} call...", &proc.name);
|
||||
unfreeze_process(&proc.name).await;
|
||||
}
|
||||
},
|
||||
// 11 - Process unfreaze call via service handler
|
||||
11 => {
|
||||
if is_frozen(&proc.name).await {
|
||||
warn!("Unfreezing process {} call...", &proc.name);
|
||||
unfreeze_process(&proc.name).await;
|
||||
}
|
||||
},
|
||||
// 101 - Impermissible trigger values in JSON
|
||||
101 => {
|
||||
error!("Impermissible trigger values in JSON");
|
||||
if is_active(&proc.name).await {
|
||||
terminate_process(&proc.name).await;
|
||||
}
|
||||
break;
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
},
|
||||
}
|
||||
// tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
|
||||
// 4ever sync
|
||||
fn load_processes(json_filename: &str) -> Option<Processes>{
|
||||
match fs::read_to_string(json_filename) {
|
||||
Ok(res) => {
|
||||
match serde_json::from_str::<Processes>(&res) {
|
||||
Ok(conf) => {
|
||||
return Some(conf);
|
||||
},
|
||||
Err(_) => {
|
||||
return None;
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
return None;
|
||||
},
|
||||
}
|
||||
}
|
||||
fn parse_extern_config(json_string: &str) -> Option<Processes> {
|
||||
let des = serde_json::from_str::<Processes>(json_string);
|
||||
if des.is_err() {
|
||||
return None;
|
||||
} else {
|
||||
return Some(des.unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_pid(name: &str) -> Output {
|
||||
let name = Arc::new(name.to_string());
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Command::new("pidof")
|
||||
.arg(&*name)
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to execute command 'pidof'");
|
||||
std::process::exit(101);
|
||||
})
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
// ! can be with bug !!!
|
||||
// * APPROVED
|
||||
async fn is_active(name: &str)-> bool {
|
||||
let arc_name = Arc::new(name.to_string());
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let output = Command::new("pidof")
|
||||
.arg(&*arc_name)
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to execute command 'pidof'");
|
||||
std::process::exit(101);
|
||||
});
|
||||
!String::from_utf8_lossy(&output.stdout).trim().is_empty()
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
// T is for stopped processes
|
||||
async fn is_frozen(name: &str) -> bool {
|
||||
let temp = get_pid(name).await;
|
||||
let pid = String::from_utf8_lossy(&temp.stdout);
|
||||
let pid = pid.trim();
|
||||
let arc_pid = Arc::new(pid.to_string());
|
||||
if pid.is_empty(){
|
||||
return false;
|
||||
} else {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let cmd = Command::new("ps")
|
||||
.args(["-o", "stat=", "-p", &arc_pid])
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to execute ps command");
|
||||
std::process::exit(101);
|
||||
});
|
||||
String::from_utf8_lossy(&cmd.stdout).contains("T")
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
async fn terminate_process (name: &str) {
|
||||
let _ = Command::new("pkill")
|
||||
.arg(name)
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to execute command 'pkill'");
|
||||
std::process::exit(101);
|
||||
});
|
||||
}
|
||||
// another test
|
||||
async fn freeze_process(name: &str) {
|
||||
let _ = Command::new("pkill")
|
||||
.args(["-STOP", name])
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to freeze process");
|
||||
std::process::exit(101);
|
||||
});
|
||||
}
|
||||
async fn unfreeze_process(name: &str) {
|
||||
let _ = Command::new("pkill")
|
||||
.args(["-CONT", name])
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to unfreeze process");
|
||||
std::process::exit(101);
|
||||
});
|
||||
}
|
||||
async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> {
|
||||
terminate_process(name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
return start_process(name, path).await;
|
||||
}
|
||||
|
||||
async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
|
||||
let runsh = format!("{}{}", path, "/run.sh");
|
||||
let mut command = Command::new("bash");
|
||||
command.arg(runsh);
|
||||
|
||||
match command.spawn() {
|
||||
Ok(_) => {
|
||||
warn!("Process {} is running now!", name);
|
||||
Ok(())
|
||||
},
|
||||
Err(_) => {
|
||||
return Err(CustomError::Fatal)
|
||||
},
|
||||
}
|
||||
}
|
||||
// check process status daemon
|
||||
async fn running_handler
|
||||
(
|
||||
prc: Arc<TrackingProcess>,
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>
|
||||
)
|
||||
{
|
||||
// println!("running daemon on {}", prc.name);
|
||||
// services and files check (once)
|
||||
let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone());
|
||||
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
|
||||
|
||||
let res = join!(files_check, services_check);
|
||||
// if inactive -> spawn checks -> active is true
|
||||
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok(){
|
||||
if start_process(&prc.name, &prc.path).await.is_err() {
|
||||
tx.send(3).await.unwrap();
|
||||
return;
|
||||
}
|
||||
}
|
||||
// if frozen -> spawn checks -> unfreeze is true
|
||||
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok(){
|
||||
tx.send(10).await.unwrap();
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
||||
async fn file_handler
|
||||
(
|
||||
name: &str,
|
||||
files: &Vec<Files>,
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>
|
||||
) -> Result<(), CustomError>
|
||||
{
|
||||
// println!("file daemon on {}", name);
|
||||
for (i, file) in files.iter().enumerate() {
|
||||
// let src = format!("{}{}", file.src, file.filename);
|
||||
if check_file(&file.filename, &file.src).await.is_err() {
|
||||
if !is_active(name).await || is_frozen(name).await {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
match file.triggers.on_delete.as_str() {
|
||||
"stay" => {
|
||||
continue;
|
||||
},
|
||||
"stop" => {
|
||||
if is_active(name).await {
|
||||
tx.send(1).await.unwrap();
|
||||
}
|
||||
return Err(CustomError::Fatal);
|
||||
},
|
||||
"hold" => {
|
||||
if is_active(name).await {
|
||||
tx.send(2).await.unwrap();
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
tx.send(101).await.unwrap();
|
||||
return Err(CustomError::Fatal);
|
||||
},
|
||||
}
|
||||
} else if is_active(name).await && !is_frozen(name).await{
|
||||
let watchers = watchers.clone();
|
||||
// println!("mutex: {:?}", watchers);
|
||||
let mut buffer = [0; 128];
|
||||
let mut mutex_guard = watchers.lock().await;
|
||||
if let Some(notify) = mutex_guard.get_mut(i) {
|
||||
let events = notify.read_events(&mut buffer);
|
||||
// println!("{:?}", events);
|
||||
if events.is_ok(){
|
||||
let events: Vec<EventMask> = events.unwrap()
|
||||
.into_iter()
|
||||
.map(|mask| {mask.mask})
|
||||
.filter(|mask| {
|
||||
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
||||
})
|
||||
.collect();
|
||||
for event in events {
|
||||
if let EventMask::DELETE_SELF = event {
|
||||
// ! warning (DELETE_SELF event) !
|
||||
// println!("! warning (DELETE_SELF event) !");
|
||||
// * watcher recreation after dealing with file recreation mech in text editors
|
||||
let mutex = notify.borrow_mut();
|
||||
|
||||
*mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
||||
}
|
||||
match file.triggers.on_change.as_str() {
|
||||
"stop" => {
|
||||
let _ = tx.send(7).await;
|
||||
},
|
||||
"restart" => {
|
||||
let _ = tx.send(8).await;
|
||||
},
|
||||
"stay" => {
|
||||
let _ = tx.send(9).await;
|
||||
},
|
||||
_ => {
|
||||
let _ = tx.send(101).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
|
||||
let arc_name = Arc::new(filename.to_string());
|
||||
let arc_path = Arc::new(path.to_string());
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let fileconcat = format!("{}{}", arc_path, arc_name);
|
||||
let path = Path::new(&fileconcat);
|
||||
if path.exists() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(CustomError::Fatal)
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| {
|
||||
panic!("Corrupted while file check process");
|
||||
})
|
||||
}
|
||||
|
||||
async fn service_handler(name: &str, services: &Vec<Services>, tx: Arc<mpsc::Sender<u8>>) -> Result<(), CustomError> {
|
||||
// println!("service daemon on {}", name);
|
||||
for serv in services {
|
||||
if check_service(&serv.hostname, &serv.port).await.is_err() {
|
||||
if !is_active(name).await || is_frozen(name).await {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
error!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name);
|
||||
match serv.triggers.on_lost.as_str() {
|
||||
"stay" => {
|
||||
},
|
||||
"stop" => {
|
||||
if looped_service_connecting(name, serv).await.is_err() {
|
||||
tx.send(5).await.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
},
|
||||
"hold" => {
|
||||
if is_frozen(name).await {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
if looped_service_connecting(name, serv).await.is_err() {
|
||||
tx.send(6).await.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
tx.send(101).await.unwrap();
|
||||
return Err(CustomError::Fatal);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
tokio::task::yield_now().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn looped_service_connecting(
|
||||
name: &str,
|
||||
serv: &Services
|
||||
) -> Result<(), CustomError>
|
||||
{
|
||||
if serv.triggers.wait == 0 {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
|
||||
match check_service(&serv.hostname, &serv.port).await {
|
||||
Ok(_) => {
|
||||
log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name);
|
||||
break;
|
||||
},
|
||||
Err(_) => {
|
||||
continue;
|
||||
},
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
} else {
|
||||
let start = Instant::now();
|
||||
while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
|
||||
match check_service(&serv.hostname, &serv.port).await {
|
||||
Ok(_) => {
|
||||
log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name);
|
||||
return Ok(());
|
||||
},
|
||||
Err(_) => {
|
||||
continue;
|
||||
},
|
||||
}
|
||||
}
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
}
|
||||
|
||||
// ! have to be rewritten
|
||||
// todo: rewrite use
|
||||
async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> {
|
||||
let addr = format!("{}:{}", hostname, port);
|
||||
|
||||
match addr.to_socket_addrs() {
|
||||
Ok(mut addrs) => {
|
||||
if let Some(_) = addrs.find(|a| TcpStream::connect_timeout(a, std::time::Duration::new(1, 0)).is_ok()) {
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
},
|
||||
Err(_) => return Err(CustomError::Fatal),
|
||||
}
|
||||
}
|
||||
|
||||
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
|
||||
fn get_container_id() -> Option<String> {
|
||||
match Command::new(GET_ID_CMD).output() {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,110 @@
|
|||
use std::sync::Arc;
|
||||
use std::process::{ Command, Output };
|
||||
use log::{error, warn};
|
||||
use tokio::time::Duration;
|
||||
use crate::structs::CustomError;
|
||||
|
||||
pub async fn get_pid(name: &str) -> Output {
|
||||
let name = Arc::new(name.to_string());
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Command::new("pidof")
|
||||
.arg(&*name)
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to execute command 'pidof'");
|
||||
std::process::exit(101);
|
||||
})
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
// ! can be with bug !!!
|
||||
// * APPROVED
|
||||
pub async fn is_active(name: &str)-> bool {
|
||||
let arc_name = Arc::new(name.to_string());
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let output = Command::new("pidof")
|
||||
.arg(&*arc_name)
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to execute command 'pidof'");
|
||||
std::process::exit(101);
|
||||
});
|
||||
!String::from_utf8_lossy(&output.stdout).trim().is_empty()
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
// T is for stopped processes
|
||||
pub async fn is_frozen(name: &str) -> bool {
|
||||
let temp = get_pid(name).await;
|
||||
let pid = String::from_utf8_lossy(&temp.stdout);
|
||||
let pid = pid.trim();
|
||||
let arc_pid = Arc::new(pid.to_string());
|
||||
if pid.is_empty(){
|
||||
return false;
|
||||
} else {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let cmd = Command::new("ps")
|
||||
.args(["-o", "stat=", "-p", &arc_pid])
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to execute ps command");
|
||||
std::process::exit(101);
|
||||
});
|
||||
String::from_utf8_lossy(&cmd.stdout).contains("T")
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
pub async fn terminate_process (name: &str) {
|
||||
let _ = Command::new("pkill")
|
||||
.arg(name)
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to execute command 'pkill'");
|
||||
std::process::exit(101);
|
||||
});
|
||||
}
|
||||
// another test
|
||||
pub async fn freeze_process(name: &str) {
|
||||
let _ = Command::new("pkill")
|
||||
.args(["-STOP", name])
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to freeze process");
|
||||
std::process::exit(101);
|
||||
});
|
||||
}
|
||||
pub async fn unfreeze_process(name: &str) {
|
||||
let _ = Command::new("pkill")
|
||||
.args(["-CONT", name])
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to unfreeze process");
|
||||
std::process::exit(101);
|
||||
});
|
||||
}
|
||||
pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> {
|
||||
terminate_process(name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
return start_process(name, path).await;
|
||||
}
|
||||
|
||||
pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
|
||||
let runsh = format!("{}{}", path, "/run.sh");
|
||||
let mut command = Command::new("bash");
|
||||
command.arg(runsh);
|
||||
|
||||
match command.spawn() {
|
||||
Ok(_) => {
|
||||
warn!("Process {} is running now!", name);
|
||||
Ok(())
|
||||
},
|
||||
Err(_) => {
|
||||
return Err(CustomError::Fatal)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,104 @@
|
|||
use std::sync::Arc;
|
||||
use tokio::time::{ Duration, Instant };
|
||||
use tokio::sync::mpsc;
|
||||
use crate::structs::{Services, CustomError};
|
||||
use crate::prcs::{is_active, is_frozen};
|
||||
use log::{error, warn};
|
||||
use std::net::{TcpStream, ToSocketAddrs};
|
||||
|
||||
|
||||
pub async fn service_handler(name: &str, services: &Vec<Services>, tx: Arc<mpsc::Sender<u8>>) -> Result<(), CustomError> {
|
||||
// println!("service daemon on {}", name);
|
||||
for serv in services {
|
||||
if check_service(&serv.hostname, &serv.port).await.is_err() {
|
||||
if !is_active(name).await || is_frozen(name).await {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
error!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name);
|
||||
match serv.triggers.on_lost.as_str() {
|
||||
"stay" => {
|
||||
},
|
||||
"stop" => {
|
||||
if looped_service_connecting(name, serv).await.is_err() {
|
||||
tx.send(5).await.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
},
|
||||
"hold" => {
|
||||
if is_frozen(name).await {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
if looped_service_connecting(name, serv).await.is_err() {
|
||||
tx.send(6).await.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
tx.send(101).await.unwrap();
|
||||
return Err(CustomError::Fatal);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
tokio::task::yield_now().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn looped_service_connecting(
|
||||
name: &str,
|
||||
serv: &Services
|
||||
) -> Result<(), CustomError>
|
||||
{
|
||||
if serv.triggers.wait == 0 {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
|
||||
match check_service(&serv.hostname, &serv.port).await {
|
||||
Ok(_) => {
|
||||
log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name);
|
||||
break;
|
||||
},
|
||||
Err(_) => {
|
||||
continue;
|
||||
},
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
} else {
|
||||
let start = Instant::now();
|
||||
while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
|
||||
match check_service(&serv.hostname, &serv.port).await {
|
||||
Ok(_) => {
|
||||
log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name);
|
||||
return Ok(());
|
||||
},
|
||||
Err(_) => {
|
||||
continue;
|
||||
},
|
||||
}
|
||||
}
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
}
|
||||
|
||||
// ! have to be rewritten
|
||||
// todo: rewrite use
|
||||
async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> {
|
||||
let addr = format!("{}:{}", hostname, port);
|
||||
|
||||
match addr.to_socket_addrs() {
|
||||
Ok(mut addrs) => {
|
||||
if let Some(_) = addrs.find(|a| TcpStream::connect_timeout(a, std::time::Duration::new(1, 0)).is_ok()) {
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
},
|
||||
Err(_) => return Err(CustomError::Fatal),
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,79 @@
|
|||
use serde::{ Deserialize, Serialize };
|
||||
|
||||
/// # an Error enum (nextly will be deleted and replaced)
|
||||
pub enum CustomError {
|
||||
Fatal,
|
||||
}
|
||||
pub enum ConfigActuality {
|
||||
Local,
|
||||
Remote,
|
||||
}
|
||||
|
||||
/// # struct for the 1st level in json conf file
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Processes {
|
||||
// #[serde(rename="id")]
|
||||
// runner_id: usize,
|
||||
#[serde(rename="dateOfCreation")]
|
||||
pub date_of_creation : String,
|
||||
#[serde(default)]
|
||||
pub processes : Vec<TrackingProcess>,
|
||||
}
|
||||
|
||||
/// # struct for each process to contain info, such as name, path and dependencies
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TrackingProcess {
|
||||
pub name : String,
|
||||
pub path : String,
|
||||
pub dependencies: Dependencies,
|
||||
}
|
||||
|
||||
/// # struct for processes' dependecies including files and services
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Dependencies {
|
||||
#[serde(default)]
|
||||
pub files : Vec<Files>,
|
||||
#[serde(default)]
|
||||
pub services: Vec<Services>,
|
||||
}
|
||||
|
||||
/// # struct for containing file object with its triggers to manipulate in daemons
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Files {
|
||||
pub filename : String,
|
||||
pub src : String,
|
||||
pub triggers : FIleTriggers,
|
||||
}
|
||||
|
||||
/// # struct for containing service object with its triggers to manipulate in daemons
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Services {
|
||||
pub hostname : String,
|
||||
pub port : u32,
|
||||
pub triggers : ServiceTriggers,
|
||||
}
|
||||
|
||||
/// # struct for instancing each service's policies such as on lost or time to wait till reachable
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct ServiceTriggers {
|
||||
pub wait : u32,
|
||||
pub delay: u32,
|
||||
#[serde(rename="onLost")]
|
||||
pub on_lost : String,
|
||||
}
|
||||
|
||||
/// # struct for instancing each file's policies such as ondelete or onupdate events
|
||||
/// > (needed in serialization and deserialization)
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct FIleTriggers {
|
||||
#[serde(rename="onDelete")]
|
||||
pub on_delete : String,
|
||||
#[serde(rename="onChange")]
|
||||
pub on_change : String,
|
||||
}
|
||||
|
|
@ -0,0 +1,159 @@
|
|||
use std::sync::Arc;
|
||||
use crate::structs::TrackingProcess;
|
||||
use tokio::sync::mpsc;
|
||||
use inotify::Inotify;
|
||||
use crate::files::create_watcher;
|
||||
use log::{error, warn};
|
||||
use crate::prcs::{
|
||||
is_active,
|
||||
is_frozen,
|
||||
terminate_process,
|
||||
restart_process,
|
||||
freeze_process,
|
||||
unfreeze_process,
|
||||
start_process
|
||||
};
|
||||
use tokio::time::Duration;
|
||||
use tokio::join;
|
||||
use crate::files::file_handler;
|
||||
use crate::services::service_handler;
|
||||
|
||||
/// # async func to run 3 main daemons (now its more like tree-form than classiacl 0.1.0 form )
|
||||
/// > hint : give mpsc with capacity 1 to jump over potential errors during running process
|
||||
/// ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") **
|
||||
pub async fn run_daemons(
|
||||
proc: Arc<TrackingProcess>,
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
rx: &mut mpsc::Receiver<u8>
|
||||
)
|
||||
{
|
||||
// creating watchers + ---buffers---
|
||||
let mut watchers: Vec<Inotify> = vec![];
|
||||
for file in proc.dependencies.files.clone().into_iter() {
|
||||
watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
|
||||
}
|
||||
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> = Arc::new(tokio::sync::Mutex::new(watchers));
|
||||
|
||||
loop {
|
||||
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
||||
tokio::select! {
|
||||
_ = run_hand => {},
|
||||
_val = rx.recv() => {
|
||||
match _val.unwrap() {
|
||||
// 1 - File-dependency handling error -> terminating (after waiting)
|
||||
1 => {
|
||||
if is_active(&proc.name).await {
|
||||
error!("Dependency handling error: Terminating {} process ..." , &proc.name);
|
||||
terminate_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
// break;
|
||||
},
|
||||
// 2 - File-dependency handling error -> holding (after waiting)
|
||||
2 => {
|
||||
if !is_frozen(&proc.name).await {
|
||||
error!("Dependency handling error: Freezing {} process ..." , &proc.name);
|
||||
freeze_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
},
|
||||
// 3 - Running process error
|
||||
3 => {
|
||||
error!("Error due to starting {} process", &proc.name);
|
||||
},
|
||||
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
|
||||
4 => {
|
||||
warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name);
|
||||
},
|
||||
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
|
||||
5 => {
|
||||
if is_active(&proc.name).await {
|
||||
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
|
||||
terminate_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
// break;
|
||||
},
|
||||
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
|
||||
6 => {
|
||||
if !is_frozen(&proc.name).await {
|
||||
error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
|
||||
freeze_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
},
|
||||
// // 7 - File-dependency change -> terminating (after check)
|
||||
7 => {
|
||||
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
|
||||
terminate_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
},
|
||||
// // 8 - File-dependency change -> restarting (after check)
|
||||
8 => {
|
||||
warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name);
|
||||
let _ = restart_process(&proc.name, &proc.path).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
},
|
||||
// // 9 - File-dependency change -> staying (after check)
|
||||
9 => {
|
||||
warn!("File-dependency warning (file changed). Ignoring on {} process...", &proc.name);
|
||||
},
|
||||
|
||||
// 10 - Process unfreaze call via file handler
|
||||
10 => {
|
||||
if is_frozen(&proc.name).await {
|
||||
warn!("Unfreezing process {} call...", &proc.name);
|
||||
unfreeze_process(&proc.name).await;
|
||||
}
|
||||
},
|
||||
// 11 - Process unfreaze call via service handler
|
||||
11 => {
|
||||
if is_frozen(&proc.name).await {
|
||||
warn!("Unfreezing process {} call...", &proc.name);
|
||||
unfreeze_process(&proc.name).await;
|
||||
}
|
||||
},
|
||||
// 101 - Impermissible trigger values in JSON
|
||||
101 => {
|
||||
error!("Impermissible trigger values in JSON");
|
||||
if is_active(&proc.name).await {
|
||||
terminate_process(&proc.name).await;
|
||||
}
|
||||
break;
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
},
|
||||
}
|
||||
// tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
// check process status daemon
|
||||
pub async fn running_handler
|
||||
(
|
||||
prc: Arc<TrackingProcess>,
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>
|
||||
)
|
||||
{
|
||||
// println!("running daemon on {}", prc.name);
|
||||
// services and files check (once)
|
||||
let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone());
|
||||
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
|
||||
|
||||
let res = join!(files_check, services_check);
|
||||
// if inactive -> spawn checks -> active is true
|
||||
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok(){
|
||||
if start_process(&prc.name, &prc.path).await.is_err() {
|
||||
tx.send(3).await.unwrap();
|
||||
return;
|
||||
}
|
||||
}
|
||||
// if frozen -> spawn checks -> unfreeze is true
|
||||
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok(){
|
||||
tx.send(10).await.unwrap();
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Loading…
Reference in New Issue