Compare commits

..

No commits in common. "56769f54b97858ef62f4a1961dc186555ff7bd77" and "1c6729daab13ff4c34f25eaea01cb7c19e543309" have entirely different histories.

6 changed files with 118 additions and 197 deletions

View File

@ -1,3 +0,0 @@
mod cli;
pub use cli::*;

View File

@ -3,16 +3,14 @@ mod utils;
use clap::Parser; use clap::Parser;
use log::{error, info}; use log::{error, info};
use options::config::*; use options::{config::*, preboot};
use options::logger::setup_logger; use options::logger::setup_logger;
use options::signals::set_valid_destructor; use options::signals::set_valid_destructor;
use options::structs::Processes; use options::structs::Processes;
use options::cli_pipeline::init_cli_pipeline;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use utils::*; use utils::*;
#[allow(unused_imports)] #[allow(unused_imports)]
use options::preboot::PrebootParams; use options::preboot::PrebootParams;
@ -89,11 +87,6 @@ async fn main() {
let _ = subscribe_config_stream(Arc::new(processes)).await; let _ = subscribe_config_stream(Arc::new(processes)).await;
})); }));
// cli pipeline
handler.push(tokio::spawn(async move {
let _ = init_cli_pipeline().await;
}));
for i in handler { for i in handler {
let _ = i.await; let _ = i.await;
} }

View File

@ -4,5 +4,4 @@ pub mod config;
pub mod logger; pub mod logger;
pub mod signals; pub mod signals;
pub mod structs; pub mod structs;
pub mod preboot; pub mod preboot;
pub mod cli_pipeline;

View File

@ -1,61 +0,0 @@
use log::{error, info, warn};
use tokio::net::{TcpListener, TcpStream};
use anyhow::{Result as DynResult, Error};
use tokio::time::{sleep, Duration};
use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}};
// use std::io::BufReader;
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
pub async fn init_cli_pipeline() -> DynResult<()> {
return match init_listener().await {
Some(list) => {
loop {
if let Ok((socket, addr)) = list.accept().await {
// isolation
if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() {
warn!("Declined attempt to connect TCP-socket from {}", addr);
continue;
}
process_connection(socket).await;
}
sleep(Duration::from_millis(500)).await;
}
// Ok(())
},
None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use"))
}
}
async fn init_listener() -> Option<TcpListener> {
return match TcpListener::bind("127.0.0.1:7753").await {
Ok(listener) => {
info!("Runner is listening localhost:7753");
Some(listener)
},
Err(_) => {
error!("Cannot create TCP listener for CLI");
None
}
}
}
async fn process_connection(mut stream: TcpStream) {
// loop{
// stream.
// }
let buf_reader = BufReader::new(stream.borrow_mut());
let mut rqst = buf_reader.lines();
while let Ok(Some(line)) = rqst.next_line().await {
if line.is_empty() {
break;
}
println!("{}", line);
}
// .map(|result| result.unwrap())
// .take_while(|line| !line.is_empty())
// .collect();
let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!";
stream.write_all(response.as_bytes()).await.unwrap();
}

View File

@ -117,7 +117,6 @@ impl PrebootParams {
eprintln!("Local config file {} doesn't exist", &self.config.display()); eprintln!("Local config file {} doesn't exist", &self.config.display());
return Err(Error::msg("Local Config Not Found. Cannot start")); return Err(Error::msg("Local Config Not Found. Cannot start"));
} }
// redis server check
Ok(self) Ok(self)
} }
} }

View File

@ -4,9 +4,8 @@ pub mod metrics;
pub mod prcs; pub mod prcs;
pub mod services; pub mod services;
// TODO : saving current flags state //
use crate::options::structs::CustomError;
use crate::options::structs::TrackingProcess; use crate::options::structs::TrackingProcess;
use files::create_watcher; use files::create_watcher;
use files::file_handler; use files::file_handler;
@ -61,130 +60,125 @@ pub async fn run_daemons(
loop { loop {
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
tokio::select! { tokio::select! {
_ = run_hand => continue, _ = run_hand => {},
_val = rx.recv() => { _val = rx.recv() => {
if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { match _val.unwrap() {
return; // 1 - File-dependency handling error -> terminating (after waiting)
} 1 => {
if is_active(&proc.name).await {
error!("File-dependency handling error: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
return;
},
// 2 - File-dependency handling error -> holding (after waiting)
2 => {
if !is_frozen(&proc.name).await {
error!("File-dependency handling error: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
},
// 3 - Running process error
3 => {
error!("Error due to starting {} process", &proc.name);
break;
},
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
4 => {
// warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name);
tokio::time::sleep(Duration::from_millis(100)).await;
},
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
5 => {
if is_active(&proc.name).await {
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(1000)).await;
}
},
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
6 => {
// println!("holding {}-{}", proc.name, is_active(&proc.name).await);
if !is_frozen(&proc.name).await {
error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
},
// // 7 - File-dependency change -> terminating (after check)
7 => {
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
return;
},
// // 8 - File-dependency change -> restarting (after check)
8 => {
warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name);
let _ = restart_process(&proc.name, &proc.path).await;
tokio::time::sleep(Duration::from_millis(100)).await;
},
// // 9 - File-dependency change -> staying (after check)
9 => {
// no need to trash logs
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
tokio::time::sleep(Duration::from_millis(100)).await;
},
// 10 - Process unfreaze call via file handler (or service handler)
10 | 11 => {
if is_frozen(&proc.name).await {
warn!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
},
// 11 - Process unfreaze call via service handler
// 11 => {
// if is_frozen(&proc.name).await {
// warn!("Unfreezing process {} call...", &proc.name);
// unfreeze_process(&proc.name).await;
// }
// tokio::time::sleep(Duration::from_millis(100)).await;
// },
// 101 - Impermissible trigger values in JSON
101 => {
error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", proc.name);
if is_active(&proc.name).await {
terminate_process(&proc.name).await;
}
break;
},
//
// 121 - Cannot create valid watcher for file dependency
121 => {
error!("Cannot create valid watcher for {}'s file dependency. Terminating thread...", proc.name);
let _ = terminate_process("runner-rs").await;
break;
},
// 111 - global thread termination with killing current child in a face
// of a current process
111 => {
warn!("Terminating {}'s child processes...", &proc.name);
match is_active(&proc.name).await {
true => {
terminate_process(&proc.name).await;
},
false => {
log::info!("Process {} is already terminated!", proc.name);
},
}
break;
},
_ => {},
}
}, },
} }
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
} tokio::task::yield_now().await;
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
match val {
// 1 - File-dependency handling error -> terminating (after waiting)
1 => {
if is_active(&proc.name).await {
error!("File-dependency handling error: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(500)).await;
}
// return;
},
// 2 - File-dependency handling error -> holding (after waiting)
2 => {
if !is_frozen(&proc.name).await {
error!("File-dependency handling error: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
},
// 3 - Running process error
3 => {
error!("Error due to starting {} process", &proc.name);
return Err(CustomError::Fatal)
},
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
4 => {
// warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name);
tokio::time::sleep(Duration::from_millis(100)).await;
},
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
5 => {
if is_active(&proc.name).await {
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(500)).await;
}
},
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
6 => {
// println!("holding {}-{}", proc.name, is_active(&proc.name).await);
if !is_frozen(&proc.name).await {
error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
},
// // 7 - File-dependency change -> terminating (after check)
7 => {
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
return Err(CustomError::Fatal)
},
// // 8 - File-dependency change -> restarting (after check)
8 => {
warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name);
let _ = restart_process(&proc.name, &proc.path).await;
tokio::time::sleep(Duration::from_millis(100)).await;
},
// // 9 - File-dependency change -> staying (after check)
9 => {
// no need to trash logs
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
tokio::time::sleep(Duration::from_millis(100)).await;
},
// 10 - Process unfreaze call via file handler (or service handler)
10 | 11 => {
if is_frozen(&proc.name).await {
warn!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
},
// 11 - Process unfreaze call via service handler
// 11 => {
// if is_frozen(&proc.name).await {
// warn!("Unfreezing process {} call...", &proc.name);
// unfreeze_process(&proc.name).await;
// }
// tokio::time::sleep(Duration::from_millis(100)).await;
// },
// 101 - Impermissible trigger values in JSON
101 => {
error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", proc.name);
if is_active(&proc.name).await {
terminate_process(&proc.name).await;
}
return Err(CustomError::Fatal)
},
//
// 121 - Cannot create valid watcher for file dependency
121 => {
error!("Cannot create valid watcher for {}'s file dependency. Terminating thread...", proc.name);
let _ = terminate_process("runner-rs").await;
return Err(CustomError::Fatal)
},
// 111 - global thread termination with killing current child in a face
// of a current process
111 => {
warn!("Terminating {}'s child processes...", &proc.name);
match is_active(&proc.name).await {
true => {
terminate_process(&proc.name).await;
},
false => {
log::info!("Process {} is already terminated!", proc.name);
},
}
},
_ => {},
}
Ok(())
} }
// check process status daemon // check process status daemon
/// # Fn `run_daemons` /// # Fn `run_daemons`