feature/preload-cli-adj #14
|
|
@ -1,9 +1,11 @@
|
||||||
[package]
|
[package]
|
||||||
name = "noxis-cli"
|
name = "noxis-cli"
|
||||||
version = "0.1.5"
|
version = "0.1.6"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow = "1.0.94"
|
||||||
clap = { version = "4.5.22", features = ["derive"] }
|
clap = { version = "4.5.22", features = ["derive"] }
|
||||||
serde = { version = "1.0.215", features = ["derive"] }
|
serde = { version = "1.0.215", features = ["derive"] }
|
||||||
serde_json = "1.0.133"
|
serde_json = "1.0.133"
|
||||||
|
tokio = { version = "1.42.0", features = ["full", "net"] }
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ pub struct Cli {
|
||||||
)]
|
)]
|
||||||
command : Commands,
|
command : Commands,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Subcommand)]
|
#[derive(Debug, Subcommand)]
|
||||||
pub enum Commands {
|
pub enum Commands {
|
||||||
#[command(
|
#[command(
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
mod cli;
|
||||||
|
|
||||||
|
pub use cli::*;
|
||||||
|
|
@ -1,11 +1,16 @@
|
||||||
mod cli;
|
mod cli;
|
||||||
|
mod net;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use cli::Cli;
|
use cli::Cli;
|
||||||
|
use net::{create_tcp_stream, try_send};
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
fn main() -> Result<(), std::io::Error>{
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<()>{
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
dbg!(&cli);
|
dbg!(&cli);
|
||||||
println!("{:?}", cli);
|
// println!("{:?}", cli);
|
||||||
|
try_send(create_tcp_stream().await, cli).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::time::{Duration, sleep};
|
||||||
|
use anyhow::Result;
|
||||||
|
use super::Cli;
|
||||||
|
|
||||||
|
|
||||||
|
pub async fn create_tcp_stream() -> Result<TcpStream> {
|
||||||
|
let stream = TcpStream::connect("127.0.0.1:7753").await?;
|
||||||
|
Ok(stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn try_send(stream: Result<TcpStream>, params: Cli) -> Result<()> {
|
||||||
|
let mut stream = stream?;
|
||||||
|
loop {
|
||||||
|
if stream.writable().await.is_err() {
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let msg = format!("{:?}", params);
|
||||||
|
// let msg = r"HTTP/1.1 POST\r\nContent-Length: 14\r\nContent-Type: text/plain\r\n\r\nHello, World!@";
|
||||||
|
stream.write_all(msg.as_bytes()).await?;
|
||||||
|
// ...
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "noxis-rs"
|
name = "noxis-rs"
|
||||||
version = "0.10.11"
|
version = "0.11.3"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
|
||||||
|
|
@ -1,26 +1,28 @@
|
||||||
mod options;
|
mod options;
|
||||||
mod utils;
|
mod utils;
|
||||||
|
|
||||||
|
use anyhow::Error;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use options::{config::*, preboot};
|
use options::config::*;
|
||||||
use options::logger::setup_logger;
|
use options::logger::setup_logger;
|
||||||
use options::signals::set_valid_destructor;
|
use options::signals::set_valid_destructor;
|
||||||
use options::structs::Processes;
|
use options::structs::Processes;
|
||||||
|
use options::cli_pipeline::init_cli_pipeline;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use utils::*;
|
use utils::*;
|
||||||
#[allow(unused_imports)]
|
|
||||||
use options::preboot::PrebootParams;
|
use options::preboot::PrebootParams;
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
async fn main() {
|
async fn main() -> anyhow::Result<()>{
|
||||||
let preboot = PrebootParams::parse().validate();
|
let preboot = Arc::new(PrebootParams::parse().validate()?);
|
||||||
|
|
||||||
if let Err(_) = preboot {
|
// if let Err(_) = preboot {
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
// let preboot = Arc::new(preboot);
|
||||||
|
|
||||||
let _ = setup_logger();
|
let _ = setup_logger();
|
||||||
|
|
||||||
|
|
@ -28,9 +30,9 @@ async fn main() {
|
||||||
|
|
||||||
// setting up redis connection \
|
// setting up redis connection \
|
||||||
// then conf checks to choose the most actual \
|
// then conf checks to choose the most actual \
|
||||||
let processes: Processes = get_actual_config().await.unwrap_or_else(|| {
|
let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
|
||||||
error!("No actual configuration for runner. Stopping...");
|
error!("No actual configuration for runner. Stopping...");
|
||||||
std::process::exit(101);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
|
|
@ -41,7 +43,7 @@ async fn main() {
|
||||||
|
|
||||||
if processes.processes.is_empty() {
|
if processes.processes.is_empty() {
|
||||||
error!("Processes list is null, runner-rs initialization is stopped");
|
error!("Processes list is null, runner-rs initialization is stopped");
|
||||||
return;
|
return Err(Error::msg("Empty processes segment in config"));
|
||||||
}
|
}
|
||||||
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
||||||
// is in need to send to the signals handler thread
|
// is in need to send to the signals handler thread
|
||||||
|
|
@ -84,13 +86,18 @@ async fn main() {
|
||||||
|
|
||||||
// remote config update subscription
|
// remote config update subscription
|
||||||
handler.push(tokio::spawn(async move {
|
handler.push(tokio::spawn(async move {
|
||||||
let _ = subscribe_config_stream(Arc::new(processes)).await;
|
let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
|
||||||
|
}));
|
||||||
|
|
||||||
|
// cli pipeline
|
||||||
|
handler.push(tokio::spawn(async move {
|
||||||
|
let _ = init_cli_pipeline().await;
|
||||||
}));
|
}));
|
||||||
|
|
||||||
for i in handler {
|
for i in handler {
|
||||||
let _ = i.await;
|
let _ = i.await;
|
||||||
}
|
}
|
||||||
return;
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: integration tests
|
// todo: integration tests
|
||||||
|
|
|
||||||
|
|
@ -5,3 +5,4 @@ pub mod logger;
|
||||||
pub mod signals;
|
pub mod signals;
|
||||||
pub mod structs;
|
pub mod structs;
|
||||||
pub mod preboot;
|
pub mod preboot;
|
||||||
|
pub mod cli_pipeline;
|
||||||
|
|
@ -0,0 +1,60 @@
|
||||||
|
use log::{error, info, warn};
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use anyhow::{Result as DynResult, Error};
|
||||||
|
use tokio::time::{sleep, Duration};
|
||||||
|
use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}};
|
||||||
|
// use std::io::BufReader;
|
||||||
|
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
|
||||||
|
|
||||||
|
|
||||||
|
pub async fn init_cli_pipeline() -> DynResult<()> {
|
||||||
|
return match init_listener().await {
|
||||||
|
Some(list) => {
|
||||||
|
loop {
|
||||||
|
if let Ok((socket, addr)) = list.accept().await {
|
||||||
|
// isolation
|
||||||
|
if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() {
|
||||||
|
warn!("Declined attempt to connect TCP-socket from {}", addr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
process_connection(socket).await;
|
||||||
|
}
|
||||||
|
sleep(Duration::from_millis(500)).await;
|
||||||
|
}
|
||||||
|
// Ok(())
|
||||||
|
},
|
||||||
|
None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn init_listener() -> Option<TcpListener> {
|
||||||
|
return match TcpListener::bind("127.0.0.1:7753").await {
|
||||||
|
Ok(listener) => {
|
||||||
|
info!("Runner is listening localhost:7753");
|
||||||
|
Some(listener)
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Cannot create TCP listener for CLI");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_connection(mut stream: TcpStream) {
|
||||||
|
// loop{
|
||||||
|
// stream.
|
||||||
|
// }
|
||||||
|
let buf_reader = BufReader::new(stream.borrow_mut());
|
||||||
|
let mut rqst = buf_reader.lines();
|
||||||
|
|
||||||
|
|
||||||
|
while let Ok(Some(line)) = rqst.next_line().await {
|
||||||
|
if line.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
println!("{}", line);
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!";
|
||||||
|
stream.write_all(response.as_bytes()).await.unwrap();
|
||||||
|
}
|
||||||
|
|
@ -7,9 +7,11 @@ use std::os::unix::process::CommandExt;
|
||||||
use std::process::Command;
|
use std::process::Command;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{env, fs};
|
use std::{env, fs};
|
||||||
use tokio::time::Duration;
|
use std::fmt::format;
|
||||||
|
use super::preboot::PrebootParams;
|
||||||
|
use tokio::time::{Duration, sleep};
|
||||||
|
|
||||||
const CONFIG_PATH: &str = "settings.json";
|
// const CONFIG_PATH: &str = "settings.json";
|
||||||
|
|
||||||
/// # Fn `load_processes`
|
/// # Fn `load_processes`
|
||||||
/// ## for reading and parsing *local* storing config
|
/// ## for reading and parsing *local* storing config
|
||||||
|
|
@ -46,18 +48,21 @@ fn load_processes(json_filename: &str) -> Option<Processes> {
|
||||||
///
|
///
|
||||||
/// *depends on* : struct `Processes`
|
/// *depends on* : struct `Processes`
|
||||||
///
|
///
|
||||||
pub async fn get_actual_config() -> Option<Processes> {
|
pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes> {
|
||||||
// * if no local conf -> loop and +inf getting conf from redis server
|
// * if no local conf -> loop and +inf getting conf from redis server
|
||||||
// * if local conf -> once getting conf from redis server
|
// * if local conf -> once getting conf from redis server
|
||||||
match load_processes(CONFIG_PATH) {
|
let config_path = params.config.to_str()?;
|
||||||
|
info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url);
|
||||||
|
match load_processes(config_path) {
|
||||||
Some(local_conf) => {
|
Some(local_conf) => {
|
||||||
info!(
|
info!(
|
||||||
"Found local configuration, version - {}",
|
"Found local configuration, version - {}",
|
||||||
&local_conf.date_of_creation
|
&local_conf.date_of_creation
|
||||||
);
|
);
|
||||||
|
if !params.no_remote_config {
|
||||||
if let Some(remote_conf) =
|
if let Some(remote_conf) =
|
||||||
// TODO : rework with pubsub mech
|
// TODO : rework with pubsub mech
|
||||||
once_get_remote_configuration(&format!("redis://{}/", local_conf.config_server))
|
once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url))
|
||||||
{
|
{
|
||||||
return match config_comparing(&local_conf, &remote_conf) {
|
return match config_comparing(&local_conf, &remote_conf) {
|
||||||
ConfigActuality::Local => {
|
ConfigActuality::Local => {
|
||||||
|
|
@ -66,24 +71,26 @@ pub async fn get_actual_config() -> Option<Processes> {
|
||||||
}
|
}
|
||||||
ConfigActuality::Remote => {
|
ConfigActuality::Remote => {
|
||||||
info!("Pulled config is more actual. Saving changes!");
|
info!("Pulled config is more actual. Saving changes!");
|
||||||
if save_new_config(&remote_conf, CONFIG_PATH).is_err() {
|
if save_new_config(&remote_conf, config_path).is_err() {
|
||||||
error!("Saving changes process failed due to unexpected error...")
|
error!("Saving changes process failed due to unexpected error...")
|
||||||
}
|
}
|
||||||
Some(remote_conf)
|
Some(remote_conf)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
}
|
||||||
Some(local_conf)
|
Some(local_conf)
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
warn!("No local valid conf was found. Trying to pull remote one...");
|
warn!("No local valid conf was found. Trying to pull remote one...");
|
||||||
let mut conn = get_connection_watcher(&open_watcher("redis://localhost/"));
|
if !params.no_remote_config {
|
||||||
let remote_config = get_remote_conf_watcher(&mut conn).await;
|
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url)));
|
||||||
if let Some(conf) = remote_config {
|
if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
|
||||||
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
|
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
|
||||||
let _ = save_new_config(&conf, CONFIG_PATH);
|
let _ = save_new_config(&conf, config_path);
|
||||||
return Some(conf);
|
return Some(conf);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -182,23 +189,23 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
|
||||||
if remote.is_none() {
|
if remote.is_none() {
|
||||||
error!("Pulled config is invalid. Check it in Redis Server");
|
error!("Pulled config is invalid. Check it in Redis Server");
|
||||||
}
|
}
|
||||||
return remote;
|
remote
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
error!("Cannot extract payload from new message. Check Redis Server state");
|
error!("Cannot extract payload from new message. Check Redis Server state");
|
||||||
return None;
|
None
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
warn!("Cannot get config from Redis Server. Empty channel");
|
warn!("Cannot get config from Redis Server. Empty channel");
|
||||||
return None;
|
None
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
error!("Redis subscription process failed. Check Redis configuration!");
|
error!("Redis subscription process failed. Check Redis configuration!");
|
||||||
return None;
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -311,8 +318,13 @@ fn restart_main_thread() -> std::io::Result<()> {
|
||||||
///
|
///
|
||||||
/// *depends on* : `Processes`
|
/// *depends on* : `Processes`
|
||||||
///
|
///
|
||||||
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(), CustomError> {
|
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
|
||||||
if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) {
|
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
|
||||||
|
|
||||||
|
if params.no_sub || params.no_remote_config {
|
||||||
|
return Err(CustomError::Fatal);
|
||||||
|
}
|
||||||
|
if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) {
|
||||||
if let Ok(mut conn) = client.get_connection() {
|
if let Ok(mut conn) = client.get_connection() {
|
||||||
match crate::utils::get_container_id() {
|
match crate::utils::get_container_id() {
|
||||||
Some(channel_name) => {
|
Some(channel_name) => {
|
||||||
|
|
@ -330,8 +342,8 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(),
|
||||||
match config_comparing(&actual_prcs, &remote_config) {
|
match config_comparing(&actual_prcs, &remote_config) {
|
||||||
ConfigActuality::Remote => {
|
ConfigActuality::Remote => {
|
||||||
warn!("Pulled config is actual. Saving and restarting...");
|
warn!("Pulled config is actual. Saving and restarting...");
|
||||||
if save_new_config(&remote_config, CONFIG_PATH).is_err() {
|
if save_new_config(&remote_config, config_path).is_err() {
|
||||||
error!("Error with saving new config to {}. Stopping sub mechanism...", &CONFIG_PATH);
|
error!("Error with saving new config to {}. Stopping sub mechanism...", config_path);
|
||||||
return Err(CustomError::Fatal);
|
return Err(CustomError::Fatal);
|
||||||
}
|
}
|
||||||
if restart_main_thread().is_err() {
|
if restart_main_thread().is_err() {
|
||||||
|
|
@ -352,7 +364,7 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
|
sleep(Duration::from_secs(30)).await;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name);
|
error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name);
|
||||||
|
|
@ -433,7 +445,7 @@ fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomEr
|
||||||
Err(_) => Err(CustomError::Fatal),
|
Err(_) => Err(CustomError::Fatal),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => return Err(CustomError::Fatal),
|
Err(_) => Err(CustomError::Fatal),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => Err(CustomError::Fatal),
|
Err(_) => Err(CustomError::Fatal),
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ use clap::Parser;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
#[derive(clap::ValueEnum, Debug, Clone)]
|
#[derive(clap::ValueEnum, Debug, Clone)]
|
||||||
enum MetricsPrebootParams {
|
pub enum MetricsPrebootParams {
|
||||||
Full,
|
Full,
|
||||||
System,
|
System,
|
||||||
Processes,
|
Processes,
|
||||||
|
|
@ -68,28 +68,28 @@ pub struct PrebootParams {
|
||||||
conflicts_with="no_hostagent",
|
conflicts_with="no_hostagent",
|
||||||
help="To set .sock file's path used in communication with host-agent"
|
help="To set .sock file's path used in communication with host-agent"
|
||||||
)]
|
)]
|
||||||
socket_path : PathBuf,
|
pub socket_path : PathBuf,
|
||||||
#[arg(
|
#[arg(
|
||||||
long = "log-to",
|
long = "log-to",
|
||||||
default_value="./",
|
default_value="./",
|
||||||
conflicts_with="no_logs",
|
conflicts_with="no_logs",
|
||||||
help="To set a path to logs directory"
|
help="To set a path to logs directory"
|
||||||
)]
|
)]
|
||||||
log_to : PathBuf,
|
pub log_to : PathBuf,
|
||||||
#[arg(
|
#[arg(
|
||||||
long = "remote-server-url",
|
long = "remote-server-url",
|
||||||
default_value="redis://localhost",
|
default_value="localhost",
|
||||||
conflicts_with="no_remote_config",
|
conflicts_with="no_remote_config",
|
||||||
help = "To set url of remote config server using in remote config pulling mechanism"
|
help = "To set url of remote config server using in remote config pulling mechanism"
|
||||||
)]
|
)]
|
||||||
remote_server_url : String,
|
pub remote_server_url : String,
|
||||||
#[arg(
|
#[arg(
|
||||||
long = "config",
|
long = "config",
|
||||||
short,
|
short,
|
||||||
default_value="settings.json",
|
default_value="settings.json",
|
||||||
help="To set local config file path"
|
help="To set local config file path"
|
||||||
)]
|
)]
|
||||||
config : PathBuf,
|
pub config : PathBuf,
|
||||||
|
|
||||||
// value enum params (metrics)
|
// value enum params (metrics)
|
||||||
#[arg(
|
#[arg(
|
||||||
|
|
@ -98,25 +98,23 @@ pub struct PrebootParams {
|
||||||
default_value_t=MetricsPrebootParams::Full,
|
default_value_t=MetricsPrebootParams::Full,
|
||||||
help="To set metrics grubbing mode"
|
help="To set metrics grubbing mode"
|
||||||
)]
|
)]
|
||||||
metrics: MetricsPrebootParams,
|
pub metrics: MetricsPrebootParams,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PrebootParams {
|
impl PrebootParams {
|
||||||
pub fn validate(self) -> Result<Self> {
|
pub fn validate(self) -> Result<Self> {
|
||||||
if !self.socket_path.exists() {
|
if !self.socket_path.exists() && !self.no_hostagent {
|
||||||
eprintln!("Socket-file {} doesn't exist. Cannot start", &self.socket_path.display());
|
return Err(Error::msg("Socket-file not found or Noxis can't read it. Cannot start"));
|
||||||
return Err(Error::msg("Socket-file Not Found"));
|
|
||||||
}
|
}
|
||||||
// existing log dir
|
// existing log dir
|
||||||
if !self.log_to.exists() {
|
if !self.log_to.exists() && !self.no_logs {
|
||||||
eprintln!("Log directory {} doesn't exist", &self.log_to.display());
|
return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start"));
|
||||||
return Err(Error::msg("Log Directory Not Found. Cannot start"));
|
|
||||||
}
|
}
|
||||||
// existing sock file
|
// existing sock file
|
||||||
if !self.config.exists() {
|
if !self.config.exists() {
|
||||||
eprintln!("Local config file {} doesn't exist", &self.config.display());
|
return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start"));
|
||||||
return Err(Error::msg("Local Config Not Found. Cannot start"));
|
|
||||||
}
|
}
|
||||||
|
// redis server check
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,8 +4,9 @@ pub mod metrics;
|
||||||
pub mod prcs;
|
pub mod prcs;
|
||||||
pub mod services;
|
pub mod services;
|
||||||
|
|
||||||
//
|
// TODO : saving current flags state
|
||||||
|
|
||||||
|
use crate::options::structs::CustomError;
|
||||||
use crate::options::structs::TrackingProcess;
|
use crate::options::structs::TrackingProcess;
|
||||||
use files::create_watcher;
|
use files::create_watcher;
|
||||||
use files::file_handler;
|
use files::file_handler;
|
||||||
|
|
@ -60,17 +61,27 @@ pub async fn run_daemons(
|
||||||
loop {
|
loop {
|
||||||
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = run_hand => {},
|
_ = run_hand => continue,
|
||||||
_val = rx.recv() => {
|
_val = rx.recv() => {
|
||||||
match _val.unwrap() {
|
if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
|
||||||
|
match val {
|
||||||
// 1 - File-dependency handling error -> terminating (after waiting)
|
// 1 - File-dependency handling error -> terminating (after waiting)
|
||||||
1 => {
|
1 => {
|
||||||
if is_active(&proc.name).await {
|
if is_active(&proc.name).await {
|
||||||
error!("File-dependency handling error: Terminating {} process ..." , &proc.name);
|
error!("File-dependency handling error: Terminating {} process ..." , &proc.name);
|
||||||
terminate_process(&proc.name).await;
|
terminate_process(&proc.name).await;
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
}
|
}
|
||||||
return;
|
// return;
|
||||||
},
|
},
|
||||||
// 2 - File-dependency handling error -> holding (after waiting)
|
// 2 - File-dependency handling error -> holding (after waiting)
|
||||||
2 => {
|
2 => {
|
||||||
|
|
@ -83,7 +94,7 @@ pub async fn run_daemons(
|
||||||
// 3 - Running process error
|
// 3 - Running process error
|
||||||
3 => {
|
3 => {
|
||||||
error!("Error due to starting {} process", &proc.name);
|
error!("Error due to starting {} process", &proc.name);
|
||||||
break;
|
return Err(CustomError::Fatal)
|
||||||
},
|
},
|
||||||
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
|
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
|
||||||
4 => {
|
4 => {
|
||||||
|
|
@ -95,7 +106,7 @@ pub async fn run_daemons(
|
||||||
if is_active(&proc.name).await {
|
if is_active(&proc.name).await {
|
||||||
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
|
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
|
||||||
terminate_process(&proc.name).await;
|
terminate_process(&proc.name).await;
|
||||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
|
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
|
||||||
|
|
@ -112,7 +123,7 @@ pub async fn run_daemons(
|
||||||
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
|
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
|
||||||
terminate_process(&proc.name).await;
|
terminate_process(&proc.name).await;
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
return;
|
return Err(CustomError::Fatal)
|
||||||
},
|
},
|
||||||
// // 8 - File-dependency change -> restarting (after check)
|
// // 8 - File-dependency change -> restarting (after check)
|
||||||
8 => {
|
8 => {
|
||||||
|
|
@ -149,14 +160,14 @@ pub async fn run_daemons(
|
||||||
if is_active(&proc.name).await {
|
if is_active(&proc.name).await {
|
||||||
terminate_process(&proc.name).await;
|
terminate_process(&proc.name).await;
|
||||||
}
|
}
|
||||||
break;
|
return Err(CustomError::Fatal)
|
||||||
},
|
},
|
||||||
//
|
//
|
||||||
// 121 - Cannot create valid watcher for file dependency
|
// 121 - Cannot create valid watcher for file dependency
|
||||||
121 => {
|
121 => {
|
||||||
error!("Cannot create valid watcher for {}'s file dependency. Terminating thread...", proc.name);
|
error!("Cannot create valid watcher for {}'s file dependency. Terminating thread...", proc.name);
|
||||||
let _ = terminate_process("runner-rs").await;
|
let _ = terminate_process("runner-rs").await;
|
||||||
break;
|
return Err(CustomError::Fatal)
|
||||||
},
|
},
|
||||||
// 111 - global thread termination with killing current child in a face
|
// 111 - global thread termination with killing current child in a face
|
||||||
// of a current process
|
// of a current process
|
||||||
|
|
@ -170,15 +181,10 @@ pub async fn run_daemons(
|
||||||
log::info!("Process {} is already terminated!", proc.name);
|
log::info!("Process {} is already terminated!", proc.name);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
},
|
},
|
||||||
_ => {},
|
_ => {},
|
||||||
}
|
}
|
||||||
},
|
Ok(())
|
||||||
}
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
}
|
||||||
// check process status daemon
|
// check process status daemon
|
||||||
/// # Fn `run_daemons`
|
/// # Fn `run_daemons`
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue