refactor + service checking bug with repeating twice

pull/9/head
prplV 2024-09-10 17:43:19 +03:00
parent deebb77ffb
commit 8c71453f29
9 changed files with 290 additions and 291 deletions

View File

@ -1,12 +1,14 @@
use std::fs;
use crate::structs::*; use crate::structs::*;
use log::{error, info, warn}; use log::{error, info, warn};
use redis::{Client, Commands, Connection}; use redis::{Client, Commands, Connection, RedisResult};
use std::fs;
use tokio::time::Duration; use tokio::time::Duration;
static CONFIG_PATH : &'static str = "settings.json"; static CONFIG_PATH: &str = "settings.json";
type Res = RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>>;
// 4ever sync // 4ever sync
fn load_processes(json_filename: &str) -> Option<Processes>{ fn load_processes(json_filename: &str) -> Option<Processes> {
if let Ok(res) = fs::read_to_string(json_filename) { if let Ok(res) = fs::read_to_string(json_filename) {
if let Ok(conf) = serde_json::from_str::<Processes>(&res) { if let Ok(conf) = serde_json::from_str::<Processes>(&res) {
return Some(conf); return Some(conf);
@ -18,39 +20,39 @@ fn load_processes(json_filename: &str) -> Option<Processes>{
pub fn get_actual_config() -> Option<Processes> { pub fn get_actual_config() -> Option<Processes> {
// * if no conf -> loop and +inf getting conf from redis server // * if no conf -> loop and +inf getting conf from redis server
// let mut local = load_processes(&CONFIG_PATH); // let mut local = load_processes(&CONFIG_PATH);
match load_processes(&CONFIG_PATH) { match load_processes(CONFIG_PATH) {
Some(local_conf) => { Some(local_conf) => {
if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") { if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") {
match config_comparing(&local_conf, &remote_conf) { return match config_comparing(&local_conf, &remote_conf) {
ConfigActuality::Local => { ConfigActuality::Local => {
info!("Local config is actual"); info!("Local config is actual");
return Some(local_conf); Some(local_conf)
}, }
ConfigActuality::Remote => { ConfigActuality::Remote => {
info!("Pulled config is more actual. Saving changes!"); info!("Pulled config is more actual. Saving changes!");
if save_new_config(&remote_conf, &CONFIG_PATH).is_err() { if save_new_config(&remote_conf, CONFIG_PATH).is_err() {
error!("Saving changes process failed due to unexpected error...") error!("Saving changes process failed due to unexpected error...")
} }
return Some(remote_conf); Some(remote_conf)
},
} }
};
} }
Some(local_conf) Some(local_conf)
}, }
None => { None => {
// ? ? OUTSTANDING CONSTRUCTION ? // ? ? OUTSTANDING CONSTRUCTION ?
let mut conn = get_connection_watcher(&open_watcher("redis://localhost")); let mut conn = get_connection_watcher(&open_watcher("redis://localhost"));
get_stream_info_watcher(&mut conn); get_stream_info_watcher(&mut conn);
let remote_config = invalid_config_watcher(&mut conn); let remote_config = invalid_config_watcher(&mut conn);
let _ = save_new_config(&remote_config, &CONFIG_PATH); let _ = save_new_config(&remote_config, CONFIG_PATH);
Some(remote_config) Some(remote_config)
}, }
} }
} }
// ! once iter exec // ! once iter exec
// ! only for situation when local isn't None (no need to fck redis server) // ! only for situation when local isn't None (no need to fck redis server)
fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> { fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
match redis::Client::open(serv_info) { match Client::open(serv_info) {
Ok(client) => { Ok(client) => {
match client.get_connection() { match client.get_connection() {
Ok(mut conn) => { Ok(mut conn) => {
@ -59,7 +61,7 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
warn!("No configuration in DB yet"); warn!("No configuration in DB yet");
None None
} else { } else {
let conf: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); let conf: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
let config: &Vec<(String, Vec<(String, String)>)>; let config: &Vec<(String, Vec<(String, String)>)>;
if conf.is_ok() { if conf.is_ok() {
@ -67,7 +69,9 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
let conf = conf.unwrap(); let conf = conf.unwrap();
config = &conf[0]; config = &conf[0];
if config.is_empty() { if config.is_empty() {
error!("Empty config was pulled. Check stream and configs state!"); error!(
"Empty config was pulled. Check stream and configs state!"
);
return None; return None;
} }
match parse_extern_config(&config[0].1[0].1) { match parse_extern_config(&config[0].1[0].1) {
@ -75,7 +79,7 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
None => { None => {
error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!"); error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!");
None None
}, }
} }
} else { } else {
error!("Configuration pulling from Redis stream failed. Check stream state!"); error!("Configuration pulling from Redis stream failed. Check stream state!");
@ -86,29 +90,29 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
error!("Cannot find config_stream. Check Redis-stream accessibility!"); error!("Cannot find config_stream. Check Redis-stream accessibility!");
None None
} }
}, }
Err(_) => { Err(_) => {
error!("Redis connection attempt is failed. Check Redis configuration!"); error!("Redis connection attempt is failed. Check Redis configuration!");
None None
},
} }
}, }
}
Err(_) => { Err(_) => {
error!("Redis-Client opening attempt is failed. Check network configuration!"); error!("Redis-Client opening attempt is failed. Check network configuration!");
None None
}, }
} }
} }
// ! watchers // ! watchers
fn open_watcher(serv_info: &str) -> redis::Client { fn open_watcher(serv_info: &str) -> Client {
loop { loop {
match redis::Client::open(serv_info) { match Client::open(serv_info) {
Ok(redis) => { Ok(redis) => {
info!("Successfully opened Redis-Client"); info!("Successfully opened Redis-Client");
return redis return redis;
}, }
Err(_) => { Err(_) => {
error!("Redis-Client opening attempt is failed. Check network configuration! Retrying..."); error!("Redis-Client opening attempt is failed. Check network configuration! Retrying...");
std::thread::sleep(Duration::from_secs(4)); std::thread::sleep(Duration::from_secs(4));
@ -121,11 +125,13 @@ fn get_connection_watcher(client: &Client) -> Connection {
loop { loop {
match client.get_connection() { match client.get_connection() {
Ok(conn) => { Ok(conn) => {
info!("Succesfully got Redis connection object"); info!("Successfully got Redis connection object");
return conn; return conn;
}, }
Err(_) => { Err(_) => {
error!("Redis connection attempt is failed. Check Redis configuration! Retrying..."); error!(
"Redis connection attempt is failed. Check Redis configuration! Retrying..."
);
std::thread::sleep(Duration::from_secs(4)); std::thread::sleep(Duration::from_secs(4));
} }
} }
@ -144,9 +150,8 @@ fn get_stream_info_watcher(conn: &mut Connection) {
} }
} }
fn invalid_config_watcher(conn: &mut Connection) -> Processes { fn invalid_config_watcher(conn: &mut Connection) -> Processes {
// let res: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>>;
loop { loop {
let res: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
if res.is_ok() { if res.is_ok() {
let config = &res.unwrap()[0]; let config = &res.unwrap()[0];
if !config.is_empty() { if !config.is_empty() {
@ -167,14 +172,13 @@ fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
let remote_date: u64 = remote.date_of_creation.parse().unwrap(); let remote_date: u64 = remote.date_of_creation.parse().unwrap();
match local_date.cmp(&remote_date) { match local_date.cmp(&remote_date) {
std::cmp::Ordering::Equal | std::cmp::Ordering::Equal | std::cmp::Ordering::Greater => ConfigActuality::Local,
std::cmp::Ordering::Greater => return ConfigActuality::Local, std::cmp::Ordering::Less => ConfigActuality::Remote,
std::cmp::Ordering::Less => return ConfigActuality::Remote,
} }
} }
// ! TEMPORARLY DEPRICATED ! // ! TEMPORARILY DEPRECATED !
// fn native_date_from_milis(mls: &str) -> Option<chrono::DateTime<Utc>> { // fn native_date_from_millis(mls: &str) -> Option<chrono::DateTime<Utc>> {
// match mls.parse::<i64>(){ // match mls.parse::<i64>(){
// Ok(val) => return chrono::DateTime::from_timestamp_millis(val), // Ok(val) => return chrono::DateTime::from_timestamp_millis(val),
// Err(_) => return None, // Err(_) => return None,
@ -183,21 +187,17 @@ fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> { fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> {
match serde_json::to_string_pretty(&config) { match serde_json::to_string_pretty(&config) {
Ok(st) => { Ok(st) => match fs::write(config_file, st) {
match fs::write(config_file, st) { Ok(_) => Ok(()),
Ok(_) => return Ok(()), Err(_) => Err(CustomError::Fatal),
Err(_) => return Err(CustomError::Fatal),
}
}, },
Err(_) => return Err(CustomError::Fatal), Err(_) => Err(CustomError::Fatal),
} }
} }
fn parse_extern_config(json_string: &str) -> Option<Processes> { fn parse_extern_config(json_string: &str) -> Option<Processes> {
let des = serde_json::from_str::<Processes>(json_string); if let Ok(des) = serde_json::from_str::<Processes>(json_string){
if des.is_err() { return Some(des);
return None;
} else {
return Some(des.unwrap());
} }
None
} }

View File

@ -1,37 +1,30 @@
use crate::structs::{Files, CustomError}; use crate::prcs::{is_active, is_frozen};
use inotify::{ EventMask, Inotify, WatchMask }; use crate::structs::{CustomError, Files};
use inotify::{EventMask, Inotify, WatchMask};
use log::error; use log::error;
use tokio::sync::mpsc;
use crate::prcs::{is_frozen, is_active};
use tokio::time::Duration;
use std::sync::Arc;
use std::path::Path;
use std::borrow::BorrowMut; use std::borrow::BorrowMut;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::Duration;
pub async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> { pub async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {
let src = format!("{}{}", path, filename); let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init().unwrap_or_else(|_| { let inotify: Inotify = Inotify::init().unwrap_or_else(|_| {
error!("{}",format!("Cannot create watcher for {}", &src)); error!("{}", format!("Cannot create watcher for {}", &src));
std::process::exit(101); std::process::exit(101);
}); });
_ = inotify _ = inotify.watches().add(&src, WatchMask::ALL_EVENTS);
.watches()
.add(
&src,
WatchMask::ALL_EVENTS
);
Ok(inotify) Ok(inotify)
} }
pub async fn file_handler pub async fn file_handler(
(
name: &str, name: &str,
files: &Vec<Files>, files: &[Files],
tx: Arc<mpsc::Sender<u8>>, tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>> watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) -> Result<(), CustomError> ) -> Result<(), CustomError> {
{
for (i, file) in files.iter().enumerate() { for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename); // let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() { if check_file(&file.filename, &file.src).await.is_err() {
@ -41,26 +34,26 @@ pub async fn file_handler
match file.triggers.on_delete.as_str() { match file.triggers.on_delete.as_str() {
"stay" => { "stay" => {
continue; continue;
}, }
"stop" => { "stop" => {
if is_active(name).await { if is_active(name).await {
tx.send(1).await.unwrap(); tx.send(1).await.unwrap();
} }
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
}, }
"hold" => { "hold" => {
if is_active(name).await { if is_active(name).await {
tx.send(2).await.unwrap(); tx.send(2).await.unwrap();
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
}, }
_ => { _ => {
tokio::time::sleep(Duration::from_millis(50)).await; tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap(); tx.send(101).await.unwrap();
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
},
} }
} else if is_active(name).await && !is_frozen(name).await{ }
} else if is_active(name).await && !is_frozen(name).await {
let watchers = watchers.clone(); let watchers = watchers.clone();
// println!("mutex: {:?}", watchers); // println!("mutex: {:?}", watchers);
let mut buffer = [0; 128]; let mut buffer = [0; 128];
@ -68,10 +61,10 @@ pub async fn file_handler
if let Some(notify) = mutex_guard.get_mut(i) { if let Some(notify) = mutex_guard.get_mut(i) {
let events = notify.read_events(&mut buffer); let events = notify.read_events(&mut buffer);
// println!("{:?}", events); // println!("{:?}", events);
if events.is_ok(){ if events.is_ok() {
let events: Vec<EventMask> = events.unwrap() let events: Vec<EventMask> = events
.into_iter() .unwrap()
.map(|mask| {mask.mask}) .map(|mask| mask.mask)
.filter(|mask| { .filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
}) })
@ -88,16 +81,16 @@ pub async fn file_handler
match file.triggers.on_change.as_str() { match file.triggers.on_change.as_str() {
"stop" => { "stop" => {
let _ = tx.send(7).await; let _ = tx.send(7).await;
}, }
"restart" => { "restart" => {
let _ = tx.send(8).await; let _ = tx.send(8).await;
}, }
"stay" => { "stay" => {
let _ = tx.send(9).await; let _ = tx.send(9).await;
}, }
_ => { _ => {
let _ = tx.send(101).await; let _ = tx.send(101).await;
}, }
} }
} }
} }
@ -108,7 +101,6 @@ pub async fn file_handler
Ok(()) Ok(())
} }
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string()); let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string()); let arc_path = Arc::new(path.to_string());

View File

@ -2,8 +2,8 @@
// use std::process::Command; // use std::process::Command;
use chrono::Local; use chrono::Local;
use env_logger::Builder; use env_logger::Builder;
use std::io::Write;
use log::LevelFilter; use log::LevelFilter;
use std::io::Write;
// use tokio::fs::File; // use tokio::fs::File;
// use tokio::io::BufWriter; // use tokio::io::BufWriter;
use crate::utils::get_container_id; use crate::utils::get_container_id;
@ -24,11 +24,11 @@ pub fn setup_logger() -> Result<(), crate::structs::CustomError> {
// std::process::exit(1); // std::process::exit(1);
// })); // }));
// building logger with current output format // building logger with current output format
Builder::new() Builder::new()
.format(move |buf, record|{ .format(move |buf, record| {
writeln!(buf, writeln!(
buf,
"Container-{}| {} [{}] - {}", "Container-{}| {} [{}] - {}",
get_container_id().unwrap_or("||NODE|".to_string()), get_container_id().unwrap_or("||NODE|".to_string()),
Local::now().format("%d-%m-%Y %H:%M:%S"), Local::now().format("%d-%m-%Y %H:%M:%S"),
@ -42,6 +42,5 @@ pub fn setup_logger() -> Result<(), crate::structs::CustomError> {
// .target(env_logger::Target::Pipe(log_target)) // .target(env_logger::Target::Pipe(log_target))
.init(); .init();
Ok(()) Ok(())
} }

View File

@ -1,21 +1,20 @@
mod structs;
mod config; mod config;
mod files; mod files;
mod prcs;
mod utils;
mod services;
mod logger; mod logger;
mod prcs;
mod services;
mod signals; mod signals;
mod structs;
mod utils;
use tokio::sync::mpsc;
use std::sync::Arc;
use log::{error, info};
use structs::*;
use config::*; use config::*;
use utils::*; use log::{error, info};
use logger::setup_logger; use logger::setup_logger;
use signals::set_valid_destructor; use signals::set_valid_destructor;
use std::sync::Arc;
use structs::*;
use tokio::sync::mpsc;
use utils::*;
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() { async fn main() {
@ -30,7 +29,10 @@ async fn main() {
std::process::exit(101); std::process::exit(101);
}); });
log::info!("Current runner configuration: {}", &processes.date_of_creation); log::info!(
"Current runner configuration: {}",
&processes.date_of_creation
);
log::info!("Runner is ready. Initializing..."); log::info!("Runner is ready. Initializing...");
if processes.processes.len() == 0 { if processes.processes.len() == 0 {
@ -42,7 +44,8 @@ async fn main() {
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![]; let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
for proc in processes.processes.iter() { for proc in processes.processes.iter() {
log::info!("Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", log::info!(
"Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
proc.name, proc.name,
proc.path, proc.path,
proc.dependencies.files.len(), proc.dependencies.files.len(),
@ -63,14 +66,12 @@ async fn main() {
handler.push(event); handler.push(event);
} }
// destructor addition // destructor addition
handler.push( handler.push(tokio::spawn(async move {
tokio::spawn(async move {
if let Err(_) = set_valid_destructor(Arc::new(senders)).await { if let Err(_) = set_valid_destructor(Arc::new(senders)).await {
error!("Linux signals handler creation failed. Returning..."); error!("Linux signals handler creation failed. Returning...");
return; return;
} }
}) }));
);
for i in handler { for i in handler {
i.await.unwrap(); i.await.unwrap();
} }

View File

@ -1,8 +1,8 @@
use std::sync::Arc;
use std::process::{ Command, Output };
use log::{error, warn};
use tokio::time::Duration;
use crate::structs::CustomError; use crate::structs::CustomError;
use log::{error, warn};
use std::process::{Command, Output};
use std::sync::Arc;
use tokio::time::Duration;
pub async fn get_pid(name: &str) -> Output { pub async fn get_pid(name: &str) -> Output {
let name = Arc::new(name.to_string()); let name = Arc::new(name.to_string());
@ -20,7 +20,7 @@ pub async fn get_pid(name: &str) -> Output {
} }
// ! can be with bug !!! // ! can be with bug !!!
// * APPROVED // * APPROVED
pub async fn is_active(name: &str)-> bool { pub async fn is_active(name: &str) -> bool {
let arc_name = Arc::new(name.to_string()); let arc_name = Arc::new(name.to_string());
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let output = Command::new("pidof") let output = Command::new("pidof")
@ -42,8 +42,8 @@ pub async fn is_frozen(name: &str) -> bool {
let pid = String::from_utf8_lossy(&temp.stdout); let pid = String::from_utf8_lossy(&temp.stdout);
let pid = pid.trim(); let pid = pid.trim();
let arc_pid = Arc::new(pid.to_string()); let arc_pid = Arc::new(pid.to_string());
if pid.is_empty(){ if pid.is_empty() {
return false; false
} else { } else {
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let cmd = Command::new("ps") let cmd = Command::new("ps")
@ -59,7 +59,7 @@ pub async fn is_frozen(name: &str) -> bool {
.unwrap() .unwrap()
} }
} }
pub async fn terminate_process (name: &str) { pub async fn terminate_process(name: &str) {
let _ = Command::new("pkill") let _ = Command::new("pkill")
.arg(name) .arg(name)
.output() .output()
@ -90,7 +90,7 @@ pub async fn unfreeze_process(name: &str) {
pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> { pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> {
terminate_process(name).await; terminate_process(name).await;
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
return start_process(name, path).await; start_process(name, path).await
} }
pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
@ -102,9 +102,7 @@ pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
Ok(_) => { Ok(_) => {
warn!("Process {} is running now!", name); warn!("Process {} is running now!", name);
Ok(()) Ok(())
}, }
Err(_) => { Err(_) => Err(CustomError::Fatal),
return Err(CustomError::Fatal)
},
} }
} }

View File

@ -1,44 +1,49 @@
use std::sync::Arc;
use tokio::time::{ Duration, Instant };
use tokio::sync::mpsc;
use crate::structs::{Services, CustomError};
use crate::prcs::{is_active, is_frozen}; use crate::prcs::{is_active, is_frozen};
use crate::structs::{CustomError, Services};
use log::{error, warn}; use log::{error, warn};
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
pub async fn service_handler(
pub async fn service_handler(name: &str, services: &Vec<Services>, tx: Arc<mpsc::Sender<u8>>) -> Result<(), CustomError> { name: &str,
services: &Vec<Services>,
tx: Arc<mpsc::Sender<u8>>,
) -> Result<(), CustomError> {
// println!("service daemon on {}", name); // println!("service daemon on {}", name);
for serv in services { for serv in services {
if check_service(&serv.hostname, &serv.port).await.is_err() { if check_service(&serv.hostname, &serv.port).await.is_err() {
if !is_active(name).await || is_frozen(name).await { if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
error!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); error!(
"Service {}:{} is unreachable for process {}",
&serv.hostname, &serv.port, &name
);
match serv.triggers.on_lost.as_str() { match serv.triggers.on_lost.as_str() {
"stay" => { "stay" => {}
},
"stop" => { "stop" => {
if looped_service_connecting(name, serv).await.is_err() { if looped_service_connecting(name, serv).await.is_err() {
tx.send(5).await.unwrap(); tx.send(5).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(400)).await;
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
}, }
"hold" => { "hold" => {
if is_frozen(name).await { if is_frozen(name).await {
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
if looped_service_connecting(name, serv).await.is_err() { if looped_service_connecting(name, serv).await.is_err() {
tx.send(6).await.unwrap(); tx.send(6).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(400)).await;
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
}, }
_ => { _ => {
tx.send(101).await.unwrap(); tx.send(101).await.unwrap();
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
}, }
} }
} }
} }
@ -47,23 +52,26 @@ pub async fn service_handler(name: &str, services: &Vec<Services>, tx: Arc<mpsc:
Ok(()) Ok(())
} }
async fn looped_service_connecting( async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
name: &str,
serv: &Services
) -> Result<(), CustomError>
{
if serv.triggers.wait == 0 { if serv.triggers.wait == 0 {
loop { loop {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); warn!(
"Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port
);
match check_service(&serv.hostname, &serv.port).await { match check_service(&serv.hostname, &serv.port).await {
Ok(_) => { Ok(_) => {
log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name); log::info!(
"Successfully connected to {} from {} process!",
&serv.hostname,
&name
);
break; break;
}, }
Err(_) => { Err(_) => {
continue; continue;
}, }
} }
} }
Ok(()) Ok(())
@ -71,15 +79,22 @@ async fn looped_service_connecting(
let start = Instant::now(); let start = Instant::now();
while start.elapsed().as_secs() < serv.triggers.wait.into() { while start.elapsed().as_secs() < serv.triggers.wait.into() {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); warn!(
"Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port
);
match check_service(&serv.hostname, &serv.port).await { match check_service(&serv.hostname, &serv.port).await {
Ok(_) => { Ok(_) => {
log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name); log::info!(
"Successfully connected to {} from {} process!",
&serv.hostname,
&name
);
return Ok(()); return Ok(());
}, }
Err(_) => { Err(_) => {
continue; continue;
}, }
} }
} }
Err(CustomError::Fatal) Err(CustomError::Fatal)
@ -93,12 +108,12 @@ async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> {
match addr.to_socket_addrs() { match addr.to_socket_addrs() {
Ok(mut addrs) => { Ok(mut addrs) => {
if let Some(_) = addrs.find(|a| TcpStream::connect_timeout(a, std::time::Duration::new(1, 0)).is_ok()) { if addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
Ok(()) Ok(())
} else { } else {
Err(CustomError::Fatal) Err(CustomError::Fatal)
} }
}, }
Err(_) => Err(CustomError::Fatal), Err(_) => Err(CustomError::Fatal),
} }
} }

View File

@ -1,11 +1,11 @@
use crate::structs::CustomError; use crate::structs::CustomError;
use std::sync::Arc; use std::sync::Arc;
use tokio::io; use tokio::io;
use tokio::sync::mpsc;
use tokio::{ use tokio::{
select, select,
signal::unix::{signal, Signal, SignalKind}, signal::unix::{signal, Signal, SignalKind},
}; };
use tokio::sync::mpsc;
type SendersVec = Arc<Vec<Arc<mpsc::Sender<u8>>>>; type SendersVec = Arc<Vec<Arc<mpsc::Sender<u8>>>>;
@ -67,7 +67,7 @@ impl SigPostProcessing for Sig {
async fn post_processing(&mut self) -> io::Result<()> { async fn post_processing(&mut self) -> io::Result<()> {
// manipulations ... // manipulations ...
if let Some(_) = self.signal.recv().await { if let Some(_) = self.signal.recv().await {
log::info!("Got {}", self.sig_type); log::info!("Got {} signal", self.sig_type);
for prc in self.senders.clone().iter() { for prc in self.senders.clone().iter() {
prc.send(111).await.unwrap(); prc.send(111).await.unwrap();
} }

View File

@ -1,4 +1,4 @@
use serde::{ Deserialize, Serialize }; use serde::{Deserialize, Serialize};
/// # an Error enum (next will be deleted and replaced) /// # an Error enum (next will be deleted and replaced)
pub enum CustomError { pub enum CustomError {
@ -15,18 +15,18 @@ pub enum ConfigActuality {
pub struct Processes { pub struct Processes {
// #[serde(rename="id")] // #[serde(rename="id")]
// runner_id: usize, // runner_id: usize,
#[serde(rename="dateOfCreation")] #[serde(rename = "dateOfCreation")]
pub date_of_creation : String, pub date_of_creation: String,
#[serde(default)] #[serde(default)]
pub processes : Vec<TrackingProcess>, pub processes: Vec<TrackingProcess>,
} }
/// # struct for each process to contain info, such as name, path and dependencies /// # struct for each process to contain info, such as name, path and dependencies
/// > (needed in serialization and deserialization) /// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TrackingProcess { pub struct TrackingProcess {
pub name : String, pub name: String,
pub path : String, pub path: String,
pub dependencies: Dependencies, pub dependencies: Dependencies,
} }
@ -35,7 +35,7 @@ pub struct TrackingProcess {
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Dependencies { pub struct Dependencies {
#[serde(default)] #[serde(default)]
pub files : Vec<Files>, pub files: Vec<Files>,
#[serde(default)] #[serde(default)]
pub services: Vec<Services>, pub services: Vec<Services>,
} }
@ -44,36 +44,36 @@ pub struct Dependencies {
/// > (needed in serialization and deserialization) /// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Files { pub struct Files {
pub filename : String, pub filename: String,
pub src : String, pub src: String,
pub triggers : FIleTriggers, pub triggers: FIleTriggers,
} }
/// # struct for containing service object with its triggers to manipulate in daemons /// # struct for containing service object with its triggers to manipulate in daemons
/// > (needed in serialization and deserialization) /// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Services { pub struct Services {
pub hostname : String, pub hostname: String,
pub port : u32, pub port: u32,
pub triggers : ServiceTriggers, pub triggers: ServiceTriggers,
} }
/// # struct for instancing each service's policies such as on lost or time to wait till reachable /// # struct for instancing each service's policies such as on lost or time to wait till reachable
/// > (needed in serialization and deserialization) /// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ServiceTriggers { pub struct ServiceTriggers {
pub wait : u32, pub wait: u32,
pub delay: u32, pub delay: u32,
#[serde(rename="onLost")] #[serde(rename = "onLost")]
pub on_lost : String, pub on_lost: String,
} }
/// # struct for instancing each file's policies such as on-delete or onupdate events /// # struct for instancing each file's policies such as on-delete or onupdate events
/// > (needed in serialization and deserialization) /// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct FIleTriggers { pub struct FIleTriggers {
#[serde(rename="onDelete")] #[serde(rename = "onDelete")]
pub on_delete : String, pub on_delete: String,
#[serde(rename="onChange")] #[serde(rename = "onChange")]
pub on_change : String, pub on_change: String,
} }

View File

@ -1,25 +1,21 @@
use std::sync::Arc;
use crate::structs::TrackingProcess;
use tokio::sync::mpsc;
use inotify::Inotify;
use std::process::Command;
use crate::files::create_watcher; use crate::files::create_watcher;
use log::{error, warn};
use crate::prcs::{
is_active,
is_frozen,
terminate_process,
restart_process,
freeze_process,
unfreeze_process,
start_process
};
use tokio::time::Duration;
use tokio::join;
use crate::files::file_handler; use crate::files::file_handler;
use crate::prcs::{
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
unfreeze_process,
};
use crate::services::service_handler; use crate::services::service_handler;
use crate::structs::TrackingProcess;
use inotify::Inotify;
use log::{error, warn};
use std::process::Command;
use std::sync::Arc;
use tokio::join;
use tokio::sync::mpsc;
use tokio::time::Duration;
static GET_ID_CMD : &'static str = r"cat /proc/self/mountinfo | grep '/docker/containers/' | head -1 | awk -F '/' '{print \$6}'"; static GET_ID_CMD: &str =
r"cat /proc/self/mountinfo | grep '/docker/containers/' | head -1 | awk -F '/' '{print \$6}'";
/// # async func to run 3 main daemons (now it's more like tree-form than classical 0.1.0 form ) /// # async func to run 3 main daemons (now it's more like tree-form than classical 0.1.0 form )
/// > hint : give mpsc with capacity 1 to jump over potential errors during running process /// > hint : give mpsc with capacity 1 to jump over potential errors during running process
@ -27,15 +23,15 @@ static GET_ID_CMD : &'static str = r"cat /proc/self/mountinfo | grep '/docker/co
pub async fn run_daemons( pub async fn run_daemons(
proc: Arc<TrackingProcess>, proc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>, tx: Arc<mpsc::Sender<u8>>,
rx: &mut mpsc::Receiver<u8> rx: &mut mpsc::Receiver<u8>,
) ) {
{
// creating watchers + ---buffers--- // creating watchers + ---buffers---
let mut watchers: Vec<Inotify> = vec![]; let mut watchers: Vec<Inotify> = vec![];
for file in proc.dependencies.files.clone().into_iter() { for file in proc.dependencies.files.clone().into_iter() {
watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
} }
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> = Arc::new(tokio::sync::Mutex::new(watchers)); let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
Arc::new(tokio::sync::Mutex::new(watchers));
loop { loop {
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
@ -146,27 +142,30 @@ pub async fn run_daemons(
} }
} }
// check process status daemon // check process status daemon
pub async fn running_handler pub async fn running_handler(
(
prc: Arc<TrackingProcess>, prc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>, tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>> watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) ) {
{
// services and files check (once) // services and files check (once)
let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone()); let files_check = file_handler(
&prc.name,
&prc.dependencies.files,
tx.clone(),
watchers.clone(),
);
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
let res = join!(files_check, services_check); let res = join!(files_check, services_check);
// if inactive -> spawn checks -> active is true // if inactive -> spawn checks -> active is true
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok(){ if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
if start_process(&prc.name, &prc.path).await.is_err() { if start_process(&prc.name, &prc.path).await.is_err() {
tx.send(3).await.unwrap(); tx.send(3).await.unwrap();
return; return;
} }
} }
// if frozen -> spawn checks -> unfreeze is true // if frozen -> spawn checks -> unfreeze is true
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok(){ else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
tx.send(10).await.unwrap(); tx.send(10).await.unwrap();
return; return;
} }
@ -174,15 +173,10 @@ pub async fn running_handler
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
pub fn get_container_id() -> Option<String> { pub fn get_container_id() -> Option<String> {
match Command::new("sh -c").arg(GET_ID_CMD).output() { match Command::new("sh -c").arg(GET_ID_CMD).output() {
Ok(output) => { Ok(output) => Some(String::from_utf8_lossy(&output.stdout).to_string()),
Some(String::from_utf8_lossy(&output.stdout).to_string()) Err(_) => None,
},
Err(_) => {
None
},
} }
} }