commit
dafc6dcbeb
|
|
@ -0,0 +1 @@
|
|||
NOXIS_SOCKET_PATH = "/home/vladislavd/diplom_code/noxis-rs/noxis.sock"
|
||||
|
|
@ -6,6 +6,7 @@ edition = "2021"
|
|||
[dependencies]
|
||||
anyhow = "1.0.94"
|
||||
clap = { version = "4.5.22", features = ["derive"] }
|
||||
dotenv = "0.15.0"
|
||||
serde = { version = "1.0.215", features = ["derive"] }
|
||||
serde_json = "1.0.133"
|
||||
thiserror = "2.0.11"
|
||||
|
|
|
|||
|
|
@ -79,7 +79,23 @@ pub enum ConfigAction {
|
|||
about = "To reset all config settings",
|
||||
)]
|
||||
Reset,
|
||||
#[command(
|
||||
about = "To get current Noxis configuration",
|
||||
name = "ls"
|
||||
)]
|
||||
Show(EnvConfig),
|
||||
}
|
||||
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
||||
pub struct EnvConfig {
|
||||
// flag
|
||||
#[arg(
|
||||
long = "env",
|
||||
action,
|
||||
help = "to read environment vars configuration",
|
||||
)]
|
||||
pub is_env : bool,
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
||||
pub struct LocalConfig {
|
||||
|
|
@ -148,4 +164,28 @@ pub enum ProcessAction {
|
|||
about = "To get info about current process's services-dependencies",
|
||||
)]
|
||||
Services,
|
||||
}
|
||||
|
||||
pub mod metrics_models {
|
||||
pub enum MetricsMode {
|
||||
Full,
|
||||
// system
|
||||
Cpu,
|
||||
Memory,
|
||||
Ram,
|
||||
Rom,
|
||||
Network,
|
||||
// processes
|
||||
Processes
|
||||
// Config
|
||||
}
|
||||
}
|
||||
|
||||
impl Cli {
|
||||
pub fn validate_socket(mut self) -> Self {
|
||||
if let Ok(path) = std::env::var("NOXIS_SOCKET_PATH") {
|
||||
self.socket = path;
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
|
|
@ -2,7 +2,7 @@ use thiserror::Error;
|
|||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum NoxisCliError {
|
||||
#[error("Can't find socket `{0}`. Error : {1}")]
|
||||
#[error("Can't find socket `{0}`. {1}")]
|
||||
NoxisDaemonMissing(String, String),
|
||||
#[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")]
|
||||
PortIsNotWritable,
|
||||
|
|
|
|||
|
|
@ -25,6 +25,6 @@ pub async fn try_send(cli: Cli) -> Result<()> {
|
|||
.await
|
||||
.map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?;
|
||||
|
||||
println!("Received response: {}", String::from_utf8_lossy(&response));
|
||||
println!("{}", String::from_utf8_lossy(&response));
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -9,7 +9,8 @@ use anyhow::Result;
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()>{
|
||||
let cli = Cli::parse();
|
||||
dotenv::dotenv().ok();
|
||||
let cli = Cli::parse().validate_socket();
|
||||
try_send(cli).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,4 +9,6 @@ NOXIS_HAGENT_SOCKET_PATH = "/var/run/example/hostagent.sock"
|
|||
NOXIS_LOG_TO = "/var/log/noxis/noxis.log"
|
||||
NOXIS_REMOTE_SERVER_URL = "ip.ip.ip.ip:port"
|
||||
NOXIS_CONFIG_PATH = "./settings.json"
|
||||
NOXIS_METRICS_MODE = "full"
|
||||
NOXIS_METRICS_MODE = "full"
|
||||
NOXIS_SOCKET_PATH = "/path/to/noxis.sock"
|
||||
NOXIS_MAX_LOG_LEVEL = "TRACE"
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "noxis-rs"
|
||||
version = "0.11.26"
|
||||
version = "0.12.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
|
@ -21,3 +21,4 @@ dotenv = "0.15.0"
|
|||
futures = "0.3.31"
|
||||
async-trait = "0.1.88"
|
||||
crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] }
|
||||
lazy_static = "1.5.0"
|
||||
|
|
|
|||
|
|
@ -20,12 +20,12 @@
|
|||
"hostname": "ya.ru",
|
||||
"port": 443,
|
||||
"triggers": {
|
||||
"wait": 10,
|
||||
"onLost": "restart"
|
||||
"wait": 2,
|
||||
"onLost": "stop"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
@ -1,17 +1,13 @@
|
|||
mod options;
|
||||
mod utils;
|
||||
|
||||
use clap::Parser;
|
||||
use log::{error, info};
|
||||
use options::config::*;
|
||||
use options::logger::setup_logger;
|
||||
use options::signals::set_valid_destructor;
|
||||
use options::structs::Processes;
|
||||
use options::cli_pipeline::init_cli_pipeline;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use utils::*;
|
||||
use options::preboot::PrebootParams;
|
||||
use tokio::sync::{broadcast, oneshot};
|
||||
use options::config::v2::init_config_mechanism;
|
||||
|
|
@ -25,23 +21,41 @@ async fn main() -> anyhow::Result<()>{
|
|||
info!("Noxis is configurating...");
|
||||
//
|
||||
let (tx_brd, mut rx_brd) = broadcast::channel::<Processes>(1);
|
||||
// for cli to get config
|
||||
let mut rx_cli_brd = tx_brd.subscribe();
|
||||
// cli <-> config
|
||||
let (tx_oneshot, rx_oneshot) = oneshot::channel::<Processes>();
|
||||
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
||||
|
||||
// initilaizing task for config manipulations
|
||||
// initilaizing task for config manipulations
|
||||
let preboot_config = preboot.clone();
|
||||
let config_module = tokio::spawn(async move {
|
||||
let _ = init_config_mechanism(
|
||||
rx_oneshot,
|
||||
tx_brd,
|
||||
preboot.clone()
|
||||
preboot_config
|
||||
).await;
|
||||
});
|
||||
handler.push(config_module);
|
||||
|
||||
// initilaizing task for cli manipulation
|
||||
let preboot_cli = preboot.clone();
|
||||
let cli_module = tokio::spawn(async move {
|
||||
if let Err(er) = init_cli_pipeline().await {
|
||||
let config = {
|
||||
let mut tick = tokio::time::interval(Duration::from_millis(500));
|
||||
loop {
|
||||
tick.tick().await;
|
||||
break match rx_cli_brd.try_recv() {
|
||||
Ok(conf) => conf,
|
||||
Err(_) => continue,
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Err(er) = init_cli_pipeline(
|
||||
preboot_cli,
|
||||
Arc::new(config),
|
||||
tx_oneshot
|
||||
).await {
|
||||
error!("CLI pipeline failed due to {}", er)
|
||||
}
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,9 +1,17 @@
|
|||
use log::{error, info};
|
||||
use tokio::net::{ UnixStream, UnixListener };
|
||||
use tokio::sync::{Mutex, OnceCell};
|
||||
use tokio::time::{sleep, Duration};
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{ AsyncWriteExt, AsyncReadExt};
|
||||
use noxis_cli::Cli;
|
||||
use super::structs::Processes;
|
||||
|
||||
use super::preboot::PrebootParams;
|
||||
|
||||
type ConfigGateway = tokio::sync::oneshot::Sender<Processes>;
|
||||
type ProcessedConfigGateway = Arc<Mutex<OnceCell<ConfigGateway>>>;
|
||||
|
||||
/// # Fn `init_cli_pipeline`
|
||||
/// ## for catching all input requests from CLI
|
||||
|
|
@ -18,19 +26,35 @@ use noxis_cli::Cli;
|
|||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
pub async fn init_cli_pipeline() -> anyhow::Result<()> {
|
||||
let socket_path = "noxis.sock";
|
||||
pub async fn init_cli_pipeline(
|
||||
params: Arc<PrebootParams>,
|
||||
config : Arc<Processes>,
|
||||
config_gateway : ConfigGateway,
|
||||
) -> anyhow::Result<()> {
|
||||
let socket_path = ¶ms.self_socket;
|
||||
let _ = fs::remove_file(socket_path);
|
||||
|
||||
let config_gateway = Arc::new(
|
||||
Mutex::new(
|
||||
OnceCell::new_with(Some(config_gateway))
|
||||
)
|
||||
);
|
||||
|
||||
match UnixListener::bind(socket_path) {
|
||||
Ok(list) => {
|
||||
// TODO: remove `unwrap`s
|
||||
info!("Listening on {}", socket_path);
|
||||
info!("Listening on {}", socket_path.display());
|
||||
std::env::set_var("NOXIS_SOCKET_PATH", socket_path);
|
||||
loop {
|
||||
match list.accept().await {
|
||||
Ok((socket, _)) => {
|
||||
// tokio::spawn();
|
||||
process_connection(socket).await;
|
||||
// ??? maybe errors on async work with data transfering between modules
|
||||
let params = params.clone();
|
||||
let config = config.clone();
|
||||
let config_gateway = config_gateway.clone();
|
||||
tokio::spawn(async move {
|
||||
process_connection(socket, params.clone(), config.clone(), config_gateway.clone()).await;
|
||||
});
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Cannot poll connection to CLI due to {}", er);
|
||||
|
|
@ -60,7 +84,7 @@ pub async fn init_cli_pipeline() -> anyhow::Result<()> {
|
|||
///
|
||||
/// *depends on* : `tokio::net::TcpStream`
|
||||
///
|
||||
async fn process_connection(mut stream: UnixStream) {
|
||||
async fn process_connection(mut stream: UnixStream, params: Arc<PrebootParams>, config : Arc<Processes>, cfg_gateway : ProcessedConfigGateway) {
|
||||
let mut buf = vec![0; 1024];
|
||||
match stream.read(&mut buf).await {
|
||||
Ok(0) => {
|
||||
|
|
@ -72,7 +96,16 @@ async fn process_connection(mut stream: UnixStream) {
|
|||
match serde_json::from_slice::<Cli>(&buf) {
|
||||
Ok(cli) => {
|
||||
info!("Received CLI request: {:?}", cli);
|
||||
let response = "OK";
|
||||
let response = match process_cli_cmd(cli, params.clone(), config, cfg_gateway.clone()).await {
|
||||
Ok(response) => {
|
||||
response
|
||||
},
|
||||
Err(er) => {
|
||||
let error_msg = format!("Error: {}", er);
|
||||
error!("{}", &error_msg);
|
||||
error_msg
|
||||
},
|
||||
};
|
||||
if let Err(e) = stream.write_all(response.as_bytes()).await {
|
||||
error!("Failed to send response: {}", e);
|
||||
}
|
||||
|
|
@ -86,3 +119,59 @@ async fn process_connection(mut stream: UnixStream) {
|
|||
}
|
||||
let _ = stream.shutdown().await;
|
||||
}
|
||||
|
||||
|
||||
async fn process_cli_cmd(cli : Cli, params: Arc<PrebootParams>, global_config : Arc<Processes>, cfg_gateway: ProcessedConfigGateway) -> anyhow::Result<String> {
|
||||
use noxis_cli::{Commands, ConfigAction};
|
||||
return match cli.command {
|
||||
Commands::Config(config) => {
|
||||
match config.action {
|
||||
ConfigAction::Show(env ) => {
|
||||
if env.is_env {
|
||||
Ok(serde_json::to_string_pretty(params.as_ref())?)
|
||||
} else {
|
||||
/* */
|
||||
Ok(serde_json::to_string_pretty(global_config.as_ref())?)
|
||||
}
|
||||
},
|
||||
ConfigAction::Reset => {
|
||||
Err(anyhow::Error::msg("It's temporarly forbidden to reset current config using CLI-util"))
|
||||
},
|
||||
ConfigAction::Local(cfg) => {
|
||||
if cfg.is_json {
|
||||
/* */
|
||||
let new_config = serde_json::from_str::<Processes>(&cfg.config)?;
|
||||
let new_version = new_config.get_version().to_string();
|
||||
|
||||
use super::{config::config_comparing, structs::ConfigActuality};
|
||||
|
||||
return match config_comparing(&global_config, &new_config) {
|
||||
ConfigActuality::Remote => {
|
||||
let cfg_gateway = cfg_gateway.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut lock = cfg_gateway.lock().await;
|
||||
match lock.take() {
|
||||
Some(channel) => {
|
||||
let _ = channel.send(new_config);
|
||||
},
|
||||
None => error!("Cannot update confif due to channel sender loss"),
|
||||
}
|
||||
});
|
||||
Ok(format!("Ok. Saving and reloading with version {}", new_version))
|
||||
},
|
||||
_ => Err(anyhow::Error::msg(format!("Local config (version: {}) is more actual", global_config.get_version()))),
|
||||
}
|
||||
} else {
|
||||
Err(anyhow::Error::msg("It's temporarly forbidden to set config in non-json mode"))
|
||||
}
|
||||
},
|
||||
ConfigAction::Remote => {Ok(params.remote_server_url.clone())},
|
||||
/* */
|
||||
// _ => Err(anyhow::Error::msg("Unrecognized command from CLI"))
|
||||
}
|
||||
},
|
||||
/* */
|
||||
Commands::Status => Ok(String::from("Ok")),
|
||||
_ => Ok(String::from("Ok"))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -189,7 +189,7 @@ pub mod v2 {
|
|||
},
|
||||
Ok(_) => {
|
||||
info!("Successfully subscribed to {} pubsub channel", channel_name);
|
||||
let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3)));
|
||||
let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(1)));
|
||||
loop {
|
||||
if let Ok(msg) = pub_sub.get_message() {
|
||||
// dbg!("ok on get message");
|
||||
|
|
@ -325,10 +325,6 @@ pub mod v2 {
|
|||
if !events.is_empty() {
|
||||
warn!("Local config file was overwritten. Discarding changes ...");
|
||||
need_to_export_config = true;
|
||||
// events
|
||||
// .iter()
|
||||
// .any(|event| *event == EventMask::DELETE_SELF)
|
||||
// .then(|| need_to_recreate_watcher = true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -418,259 +414,6 @@ fn load_processes(json_filename: &str) -> Option<Processes> {
|
|||
None
|
||||
}
|
||||
|
||||
/// # Fn `get_actual_config`
|
||||
/// ## for getting actual Monitor's config from local and remote storages
|
||||
///
|
||||
/// *input* : -
|
||||
///
|
||||
/// *output* : `None` on fatal error in mechanisms | `Some(conf)` on finish reading and parsing
|
||||
///
|
||||
/// *initiator* : main thread
|
||||
///
|
||||
/// *managing* : -
|
||||
///
|
||||
/// *depends on* : struct `Processes`
|
||||
///
|
||||
pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes> {
|
||||
// * if no local conf -> loop and +inf getting conf from redis server
|
||||
// * if local conf -> once getting conf from redis server
|
||||
let config_path = params.config.to_str().unwrap_or_else(|| {
|
||||
error!("Invalid character in config file. Config path was set to default");
|
||||
"settings.json"
|
||||
});
|
||||
info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url);
|
||||
match load_processes(config_path) {
|
||||
Some(local_conf) => {
|
||||
info!(
|
||||
"Found local configuration, version - {}",
|
||||
&local_conf.date_of_creation
|
||||
);
|
||||
if !params.no_sub {
|
||||
if let Some(remote_conf) =
|
||||
// TODO : rework with pubsub mech
|
||||
once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url))
|
||||
{
|
||||
return match config_comparing(&local_conf, &remote_conf) {
|
||||
ConfigActuality::Local => {
|
||||
info!("Local config is actual");
|
||||
Some(local_conf)
|
||||
}
|
||||
ConfigActuality::Remote => {
|
||||
info!("Pulled config is more actual. Saving changes!");
|
||||
if save_new_config(&remote_conf, config_path).is_err() {
|
||||
error!("Saving changes process failed due to unexpected error...")
|
||||
}
|
||||
Some(remote_conf)
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Some(local_conf)
|
||||
}
|
||||
None => {
|
||||
warn!("No local valid conf was found. Trying to pull remote one...");
|
||||
if !params.no_sub {
|
||||
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url)));
|
||||
if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
|
||||
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
|
||||
let _ = save_new_config(&conf, config_path);
|
||||
return Some(conf);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `get_remote_conf_watcher`
|
||||
/// ## for infinitive pulling remote config
|
||||
///
|
||||
/// *input* : `&mut Connection`
|
||||
///
|
||||
/// *output* : `None` on fatal error | `Some(conf)` on succesfull pulling
|
||||
///
|
||||
/// *initiator* : fn `get_actual_config`
|
||||
///
|
||||
/// *managing* : mut ref `Connection` object
|
||||
///
|
||||
/// *depends on* : struct `Processes`
|
||||
///
|
||||
async fn get_remote_conf_watcher(conn : &mut Connection) -> Option<Processes> {
|
||||
let mut conn = conn.as_pubsub();
|
||||
let cont = crate::utils::get_container_id();
|
||||
loop {
|
||||
match cont {
|
||||
Some(ref cont) => {
|
||||
let cont = cont.trim();
|
||||
if conn.subscribe(cont).is_err() {
|
||||
// todo : delay
|
||||
continue;
|
||||
}
|
||||
match conn.get_message() {
|
||||
Ok(msg) => {
|
||||
let msg: Result<String, redis::RedisError> = msg.get_payload();
|
||||
if let Ok(payload) = msg {
|
||||
if let Some(remote) = parse_extern_config(&payload) {
|
||||
return Some(remote)
|
||||
}
|
||||
else {
|
||||
error!("Pulled invalid config, cannot start. Waiting for remote conf...");
|
||||
}
|
||||
} else {
|
||||
error!("Cannot get Redis message payload. Waiting for remote conf...");
|
||||
}
|
||||
// todo : delay
|
||||
continue;
|
||||
},
|
||||
Err(_) => {
|
||||
// todo : delay
|
||||
continue;
|
||||
},
|
||||
}
|
||||
},
|
||||
None => {
|
||||
error!("Cannot get container id. Returning");
|
||||
break
|
||||
},
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// # Fn `get_remote_conf_watcher`
|
||||
/// ## for trying to pull remote config
|
||||
///
|
||||
/// > only for situation when local isn't None (no need to fck redis server)
|
||||
///
|
||||
/// *input* : `&str`
|
||||
///
|
||||
/// *output* : `None` on empty pubsub or error | `Some(conf)` on succesfull pulling
|
||||
///
|
||||
/// *initiator* : fn `get_actual_config`
|
||||
///
|
||||
/// *managing* : &str of Redis Server credentials
|
||||
///
|
||||
/// *depends on* : struct `Processes`
|
||||
///
|
||||
fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
|
||||
let cont = crate::utils::get_container_id();
|
||||
match Client::open(serv_info) {
|
||||
Ok(client) => {
|
||||
match client.get_connection() {
|
||||
Ok(mut conn) => {
|
||||
let mut conn = conn.as_pubsub();
|
||||
match conn.subscribe(cont) {
|
||||
Ok(_) => {
|
||||
if conn.set_read_timeout(Some(Duration::from_millis(100))).is_err() {
|
||||
error!("Cannot set reading pubsub timeout and pull remote config");
|
||||
return None;
|
||||
}
|
||||
match conn.get_message() {
|
||||
Ok(msg) => {
|
||||
info!("Pulled config from Redis Server");
|
||||
let get_payload: Result<String, redis::RedisError> = msg.get_payload();
|
||||
match get_payload {
|
||||
Ok(payload) => {
|
||||
let remote = parse_extern_config(&payload);
|
||||
if remote.is_none() {
|
||||
error!("Pulled config is invalid. Check it in Redis Server");
|
||||
}
|
||||
remote
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Cannot extract payload from new message. Check Redis Server state");
|
||||
None
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
None
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Redis subscription process failed. Check Redis configuration!");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Redis connection attempt is failed. Check Redis configuration!");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Redis-Client opening attempt is failed. Check network configuration!");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ! watchers
|
||||
|
||||
/// # Fn `open_watcher`
|
||||
/// ## for infinitive opening Redis client
|
||||
///
|
||||
/// > only for situation when local isn't None (no need to fck redis server)
|
||||
///
|
||||
/// *input* : `Option<Processes>`
|
||||
///
|
||||
/// *output* : redis::Client on successful opening client
|
||||
///
|
||||
/// *initiator* : fn `get_actual_config`
|
||||
///
|
||||
/// *managing* : &str of Redis Server credentials
|
||||
///
|
||||
/// *depends on* : struct `redis::Client`
|
||||
///
|
||||
fn open_watcher(serv_info: &str) -> Client {
|
||||
loop {
|
||||
match Client::open(serv_info) {
|
||||
Ok(redis) => {
|
||||
info!("Successfully opened Redis-Client");
|
||||
return redis;
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Redis-Client opening attempt is failed. Check network configuration! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `get_connection_watcher`
|
||||
/// ## for infinitive establishing Redis connection on existing client
|
||||
///
|
||||
/// > only for situation when local isn't None (no need to fck redis server)
|
||||
///
|
||||
/// *input* : `&Client`
|
||||
///
|
||||
/// *output* : `Connection`
|
||||
///
|
||||
/// *initiator* : fn `get_actual_config`
|
||||
///
|
||||
/// *managing* : &Client for opening connection
|
||||
///
|
||||
/// *depends on* : struct `redis::Connection`
|
||||
///
|
||||
fn get_connection_watcher(client: &Client) -> Connection {
|
||||
loop {
|
||||
match client.get_connection() {
|
||||
Ok(conn) => {
|
||||
info!("Successfully got Redis connection object");
|
||||
return conn;
|
||||
}
|
||||
Err(_) => {
|
||||
error!(
|
||||
"Redis connection attempt is failed. Check Redis configuration! Retrying..."
|
||||
);
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `restart_main_thread`
|
||||
/// ## for restart monitor with new config
|
||||
///
|
||||
|
|
@ -686,87 +429,10 @@ fn get_connection_watcher(client: &Client) -> Connection {
|
|||
///
|
||||
fn restart_main_thread() -> std::io::Result<()> {
|
||||
let current_exe = env::current_exe()?;
|
||||
Command::new(current_exe).exec();
|
||||
let _ = Command::new(current_exe).exec();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// # Fn `subscribe_config_stream`
|
||||
/// ## for subscribe on changes, pulling to Redis pubsub to get more actual config
|
||||
///
|
||||
/// *input* : `Arc<Processes>`
|
||||
///
|
||||
/// *output* : `Ok(())` on end of work | `Err(er)` on error with subscribing mechanism
|
||||
///
|
||||
/// *initiator* : fn `subscribe_config_stream`
|
||||
///
|
||||
/// *managing* : `Arc<Processes>` to compare old config with new pulled
|
||||
///
|
||||
/// *depends on* : `Processes`
|
||||
///
|
||||
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
|
||||
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
|
||||
|
||||
if params.no_sub {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) {
|
||||
if let Ok(mut conn) = client.get_connection() {
|
||||
match crate::utils::get_container_id() {
|
||||
Some(channel_name) => {
|
||||
let channel_name = channel_name.trim();
|
||||
let mut pubsub = conn.as_pubsub();
|
||||
if pubsub.subscribe(&channel_name).is_ok() {
|
||||
info!("Runner subscribed on config update publishing in channel {}", &channel_name);
|
||||
loop {
|
||||
if let Ok(msg) = pubsub.get_message() {
|
||||
let get_remote_config: Result<String, redis::RedisError> = msg.get_payload();
|
||||
match get_remote_config {
|
||||
Ok(payload) => {
|
||||
if let Some(remote_config) = parse_extern_config(&payload) {
|
||||
match config_comparing(&actual_prcs, &remote_config) {
|
||||
ConfigActuality::Remote => {
|
||||
warn!("Pulled config is actual. Saving and restarting...");
|
||||
if save_new_config(&remote_config, config_path).is_err() {
|
||||
error!("Error with saving new config to {}. Stopping sub mechanism...", config_path);
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
if restart_main_thread().is_err() {
|
||||
error!("Error with restarting Runner. Stopping sub mechanism...");
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
warn!("Pulled new config. Current config is more actual ...");
|
||||
continue
|
||||
},
|
||||
}
|
||||
}
|
||||
else {
|
||||
error!("Invalid conig was pulled");
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Cannot extract new config from message");
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_secs(30)).await;
|
||||
}
|
||||
} else {
|
||||
error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
error!("Cannot get channel name");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("Error with subscribing Redis stream on update. Working only with selected config...");
|
||||
Err(CustomError::Fatal)
|
||||
}
|
||||
|
||||
/// # Fn `config_comparing`
|
||||
/// ## for compare old and new configs
|
||||
///
|
||||
|
|
@ -780,7 +446,7 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<Pr
|
|||
///
|
||||
/// *depends on* : `Processes`, `ConfigActuality`
|
||||
///
|
||||
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
||||
pub fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
||||
if local.is_default() {
|
||||
return ConfigActuality::Remote;
|
||||
}
|
||||
|
|
@ -793,14 +459,6 @@ fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
|||
}
|
||||
}
|
||||
|
||||
// ! TEMPORARILY DEPRECATED !
|
||||
// fn native_date_from_millis(mls: &str) -> Option<chrono::DateTime<Utc>> {
|
||||
// match mls.parse::<i64>(){
|
||||
// Ok(val) => return chrono::DateTime::from_timestamp_millis(val),
|
||||
// Err(_) => return None,
|
||||
// }
|
||||
// }
|
||||
|
||||
/// # Fn `save_new_config`
|
||||
/// ## mechanism for saving new config in local storage
|
||||
///
|
||||
|
|
@ -889,11 +547,6 @@ mod config_unittests {
|
|||
|
||||
assert_eq!(config_comparing(&a, &b), ConfigActuality::Remote);
|
||||
}
|
||||
// TODO : strange output
|
||||
// #[test]
|
||||
// fn get_actual_config_mechanism() {
|
||||
// assert!(get_actual_config().is_some())
|
||||
// }
|
||||
#[test]
|
||||
fn save_config() {
|
||||
let a = Processes {
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ pub fn setup_logger() -> Result<(), crate::options::structs::CustomError> {
|
|||
record.args(),
|
||||
)
|
||||
})
|
||||
.filter(None, LevelFilter::Info)
|
||||
.filter(None, LevelFilter::from_env())
|
||||
.target(env_logger::Target::Stdout)
|
||||
// temporary deprecated
|
||||
// .target(env_logger::Target::Pipe(log_target))
|
||||
|
|
@ -58,6 +58,29 @@ pub fn setup_logger() -> Result<(), crate::options::structs::CustomError> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
trait FromEnv {
|
||||
fn from_env() -> LevelFilter;
|
||||
}
|
||||
|
||||
impl FromEnv for LevelFilter {
|
||||
fn from_env() -> LevelFilter {
|
||||
return match std::env::var("NOXIS_MAX_LOG_LEVEL") {
|
||||
Ok(var) => {
|
||||
match var.to_ascii_lowercase().trim().as_ref() {
|
||||
"trace" => LevelFilter::Trace,
|
||||
"debug" => LevelFilter::Debug,
|
||||
"info" => LevelFilter::Info,
|
||||
"error" => LevelFilter::Error,
|
||||
"warn" => LevelFilter::Warn,
|
||||
"off" => LevelFilter::Off,
|
||||
_ => LevelFilter::Info,
|
||||
}
|
||||
},
|
||||
Err(_) => LevelFilter::Info,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod logger_tests {
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
//!
|
||||
#[allow(unused_imports)]
|
||||
use anyhow::{Result, Error};
|
||||
use log::warn;
|
||||
use std::path::PathBuf;
|
||||
use std::env::var;
|
||||
use dotenv::dotenv;
|
||||
|
|
@ -19,9 +20,9 @@ use dotenv::dotenv;
|
|||
/// noxis-rs ... --metrics none
|
||||
/// ```
|
||||
///
|
||||
#[derive(clap::ValueEnum, Debug, Clone)]
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub enum MetricsPrebootParams {
|
||||
Full,
|
||||
Full,
|
||||
System,
|
||||
Processes,
|
||||
Net,
|
||||
|
|
@ -176,17 +177,18 @@ impl std::fmt::Display for MetricsPrebootParams {
|
|||
/// export NOXIS_METRICS_MODE "full"
|
||||
/// ```
|
||||
///
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct PrebootParams {
|
||||
pub no_hostagent : bool,
|
||||
// pub no_hostagent : bool,
|
||||
pub no_logs: bool,
|
||||
pub refresh_logs : bool,
|
||||
pub no_sub : bool,
|
||||
pub socket_path : PathBuf,
|
||||
// pub socket_path : PathBuf,
|
||||
pub log_to : PathBuf,
|
||||
pub remote_server_url : String,
|
||||
pub config : PathBuf,
|
||||
pub metrics: MetricsPrebootParams,
|
||||
pub self_socket : PathBuf,
|
||||
}
|
||||
|
||||
/// # implementation for `MetricsPrebootParams`
|
||||
|
|
@ -196,12 +198,12 @@ impl PrebootParams {
|
|||
dotenv().ok();
|
||||
Self {
|
||||
// bool
|
||||
no_hostagent : {
|
||||
match var("NOXIS_NO_HAGENT") {
|
||||
Ok(_) => true,
|
||||
Err(_) => false,
|
||||
}
|
||||
},
|
||||
// no_hostagent : {
|
||||
// match var("NOXIS_NO_HAGENT") {
|
||||
// Ok(_) => true,
|
||||
// Err(_) => false,
|
||||
// }
|
||||
// },
|
||||
no_logs : {
|
||||
match var("NOXIS_NO_LOGS") {
|
||||
Ok(_) => true,
|
||||
|
|
@ -221,12 +223,12 @@ impl PrebootParams {
|
|||
}
|
||||
},
|
||||
// vals
|
||||
socket_path : {
|
||||
match var("NOXIS_HAGENT_SOCKET_PATH") {
|
||||
Ok(val) => PathBuf::from(val),
|
||||
Err(_) => PathBuf::from("/var/run/enode/hostagent.sock"),
|
||||
}
|
||||
},
|
||||
// socket_path : {
|
||||
// match var("NOXIS_HAGENT_SOCKET_PATH") {
|
||||
// Ok(val) => PathBuf::from(val),
|
||||
// Err(_) => PathBuf::from("/var/run/enode/hostagent.sock"),
|
||||
// }
|
||||
// },
|
||||
log_to : {
|
||||
match var("NOXIS_LOG_TO") {
|
||||
Ok(val) => PathBuf::from(val),
|
||||
|
|
@ -251,6 +253,16 @@ impl PrebootParams {
|
|||
Err(_) => MetricsPrebootParams::Full,
|
||||
}
|
||||
},
|
||||
self_socket : {
|
||||
match var("NOXIS_SOCKET_PATH") {
|
||||
Ok(val) => PathBuf::from(val),
|
||||
Err(_) => {
|
||||
let default = std::env::current_dir().expect("Crushed on getting current_dir path. Check fs state!");
|
||||
warn!("$NOXIS_SOCKET_PATH wans't set. Default value - {}", default.display());
|
||||
PathBuf::from(default)
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
use super::structs::CustomError;
|
||||
use std::sync::Arc;
|
||||
use tokio::io;
|
||||
use tokio::sync::mpsc;
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
|
|||
use async_trait::async_trait;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DependencyType {
|
||||
File,
|
||||
|
|
@ -93,7 +94,21 @@ pub enum ProcessState {
|
|||
Holding,
|
||||
Stopped,
|
||||
StoppedByCli,
|
||||
HoldingByCli,
|
||||
}
|
||||
impl std::fmt::Display for ProcessState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
return match self {
|
||||
ProcessState::Pending => write!(f, "Running"),
|
||||
ProcessState::Holding => write!(f, "Holding"),
|
||||
ProcessState::Stopped => write!(f, "Stopped"),
|
||||
ProcessState::StoppedByCli => write!(f, "Forcibly stopped"),
|
||||
ProcessState::HoldingByCli => write!(f, "Forcibly holding"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Events {
|
||||
Positive(Arc<str>),
|
||||
|
|
@ -158,6 +173,9 @@ impl Processes {
|
|||
pub fn is_default(&self) -> bool {
|
||||
self.date_of_creation.is_empty()
|
||||
}
|
||||
pub fn get_version(&self) -> &str {
|
||||
&self.date_of_creation
|
||||
}
|
||||
}
|
||||
|
||||
/// # Struct for the 2nd level in json conf file
|
||||
|
|
|
|||
|
|
@ -4,31 +4,23 @@ pub mod metrics;
|
|||
pub mod prcs;
|
||||
pub mod services;
|
||||
|
||||
// TODO : saving current flags state
|
||||
|
||||
use crate::options::structs::{CustomError, TrackingProcess, Processes};
|
||||
// use files::create_watcher;
|
||||
// use files::file_handler;
|
||||
// use inotify::Inotify;
|
||||
use log::{error, warn, info};
|
||||
use prcs::{
|
||||
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
|
||||
unfreeze_process,
|
||||
};
|
||||
// use services::service_handler;
|
||||
use crate::options::structs::Processes;
|
||||
use log::{error, info};
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
// use tokio::join;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Duration;
|
||||
// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender};
|
||||
// controllers import
|
||||
use prcs::v2::ProcessesController;
|
||||
use files::v2::FilesController;
|
||||
use services::v2::ServicesController;
|
||||
use async_trait::async_trait;
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
const GET_ID_CMD: &str = "hostname";
|
||||
lazy_static! {
|
||||
static ref GET_ID_CMD : &'static str = "hostname";
|
||||
}
|
||||
|
||||
// const GET_ID_CMD: &str = "hostname";
|
||||
|
||||
pub mod v2 {
|
||||
use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
|
||||
|
|
@ -47,14 +39,16 @@ pub mod v2 {
|
|||
prcs : LinkedList<ProcessesController>,
|
||||
files : LinkedList<FilesController>,
|
||||
services : LinkedList<ServicesController>,
|
||||
config : Arc<Processes>,
|
||||
}
|
||||
|
||||
impl Supervisor {
|
||||
pub fn new() -> Supervisor {
|
||||
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
|
||||
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new(), config: Arc::new(Processes::default()) }
|
||||
}
|
||||
pub async fn with_config(mut self, config: &Processes) -> Supervisor {
|
||||
let _ = config.processes.iter()
|
||||
pub async fn with_config(mut self, config: Processes) -> Supervisor {
|
||||
self.config = Arc::from(config);
|
||||
let _ = self.config.processes.iter()
|
||||
.for_each(|prc| {
|
||||
let (rx, tx) = mpsc::channel::<Events>(10);
|
||||
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
|
||||
|
|
@ -127,6 +121,9 @@ pub mod v2 {
|
|||
async fn process(&mut self) {
|
||||
info!("Initializing monitoring ...");
|
||||
loop {
|
||||
//
|
||||
// todo: CHANNEL check and reaction
|
||||
//
|
||||
// dbg!(&self);
|
||||
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
|
||||
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
|
||||
|
|
@ -177,242 +174,13 @@ pub mod v2 {
|
|||
pub async fn init_monitoring(
|
||||
config: Processes
|
||||
) -> anyhow::Result<()> {
|
||||
let mut supervisor = Supervisor::new().with_config(&config).await;
|
||||
let mut supervisor = Supervisor::new().with_config(config).await;
|
||||
info!("Monitoring: {} ", &supervisor.get_stats());
|
||||
supervisor.process().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
// async fn generate_controllers<'a>(config: Processes) -> (HashSet<ProcessesController<'a>>, HashSet<FilesController<'a>>, HashSet<ServicesController<'a>>) {
|
||||
// let mut prcs: HashSet<ProcessesController<'a>> = HashSet::new();
|
||||
// let mut files: HashSet<FilesController<'a>> = HashSet::new();
|
||||
// let mut services: HashSet<ServicesController<'a>> = HashSet::new();
|
||||
// for prc in config.processes {
|
||||
// let (rx, tx) = mpsc::channel::<Events<'a>>(10);
|
||||
// // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path);
|
||||
// let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path);
|
||||
// let a = new_prc.process().await;
|
||||
|
||||
// }
|
||||
// (prcs, files, services)
|
||||
// }
|
||||
// spawn prc check with semaphore check
|
||||
async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) }
|
||||
|
||||
// spawn file check with semaphore check
|
||||
async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) }
|
||||
|
||||
// spawn service check with semaphore check
|
||||
async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) }
|
||||
}
|
||||
|
||||
/// # Fn `run_daemons`
|
||||
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
|
||||
///
|
||||
/// *input* : `Arc<TrackingProcess>`, `Arc<mpsc::Sender<u8>>`, `&mut mpsc::Receiver<u8>`,
|
||||
///
|
||||
/// *output* : ()
|
||||
///
|
||||
/// *initiator* : main thread
|
||||
///
|
||||
/// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader
|
||||
///
|
||||
/// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher`
|
||||
///
|
||||
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
|
||||
///
|
||||
// pub async fn run_daemons(
|
||||
// proc: Arc<TrackingProcess>,
|
||||
// tx: Arc<mpsc::Sender<u8>>,
|
||||
// rx: &mut mpsc::Receiver<u8>,
|
||||
// ) {
|
||||
// // creating watchers + ---buffers---
|
||||
// let mut watchers: Vec<Inotify> = vec![];
|
||||
// for file in proc.dependencies.files.clone().into_iter() {
|
||||
// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
||||
// watchers.push(watcher);
|
||||
// } else {
|
||||
// let _ = tx.send(121).await;
|
||||
// }
|
||||
// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
|
||||
// }
|
||||
// let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
|
||||
// Arc::new(tokio::sync::Mutex::new(watchers));
|
||||
|
||||
// loop {
|
||||
// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
||||
// tokio::select! {
|
||||
// _ = run_hand => continue,
|
||||
// _val = rx.recv() => {
|
||||
// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
|
||||
// return;
|
||||
// }
|
||||
// },
|
||||
// }
|
||||
// tokio::task::yield_now().await;
|
||||
// }
|
||||
// }
|
||||
|
||||
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
|
||||
match val {
|
||||
// 1 - File-dependency handling error -> terminating (after waiting)
|
||||
1 => {
|
||||
if is_active(&proc.name).await {
|
||||
error!("File-dependency handling error: Terminating {} process ..." , &proc.name);
|
||||
terminate_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
// return;
|
||||
},
|
||||
// 2 - File-dependency handling error -> holding (after waiting)
|
||||
2 => {
|
||||
if !is_frozen(&proc.name).await {
|
||||
error!("File-dependency handling error: Freezing {} process ..." , &proc.name);
|
||||
freeze_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
},
|
||||
// 3 - Running process error
|
||||
3 => {
|
||||
error!("Error due to starting {} process", &proc.name);
|
||||
return Err(CustomError::Fatal)
|
||||
},
|
||||
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
|
||||
4 => {
|
||||
// warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name);
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
},
|
||||
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
|
||||
5 => {
|
||||
if is_active(&proc.name).await {
|
||||
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
|
||||
terminate_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
},
|
||||
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
|
||||
6 => {
|
||||
// println!("holding {}-{}", proc.name, is_active(&proc.name).await);
|
||||
if !is_frozen(&proc.name).await {
|
||||
error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
|
||||
freeze_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
},
|
||||
// // 7 - File-dependency change -> terminating (after check)
|
||||
7 => {
|
||||
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
|
||||
terminate_process(&proc.name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
return Err(CustomError::Fatal)
|
||||
},
|
||||
// // 8 - File-dependency change -> restarting (after check)
|
||||
8 => {
|
||||
warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name);
|
||||
let _ = restart_process(&proc.name, &proc.path).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
},
|
||||
// // 9 - File-dependency change -> staying (after check)
|
||||
9 => {
|
||||
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
},
|
||||
|
||||
// 10 - Process unfreaze call via file handler (or service handler)
|
||||
10 | 11 => {
|
||||
if is_frozen(&proc.name).await {
|
||||
warn!("Unfreezing process {} call...", &proc.name);
|
||||
unfreeze_process(&proc.name).await;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
},
|
||||
// 11 - Process unfreaze call via service handler
|
||||
// 11 => {
|
||||
// if is_frozen(&proc.name).await {
|
||||
// warn!("Unfreezing process {} call...", &proc.name);
|
||||
// unfreeze_process(&proc.name).await;
|
||||
// }
|
||||
// tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
// },
|
||||
// 101 - Impermissible trigger values in JSON
|
||||
101 => {
|
||||
error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", &proc.name);
|
||||
if is_active(&proc.name).await {
|
||||
terminate_process(&proc.name).await;
|
||||
}
|
||||
return Err(CustomError::Fatal)
|
||||
},
|
||||
//
|
||||
// 121 - Cannot create valid watcher for file dependency
|
||||
// todo : think about valid situation
|
||||
121 => {
|
||||
error!("Cannot create valid watcher for file dependency. Terminating {} process...", &proc.name);
|
||||
let _ = terminate_process(&proc.name).await;
|
||||
return Err(CustomError::Fatal)
|
||||
},
|
||||
// 111 - global thread termination with killing current child in a face
|
||||
// of a current process
|
||||
111 => {
|
||||
warn!("Terminating {}'s child processes...", &proc.name);
|
||||
match is_active(&proc.name).await {
|
||||
true => {
|
||||
terminate_process(&proc.name).await;
|
||||
},
|
||||
false => {
|
||||
log::info!("Process {} is already terminated!", proc.name);
|
||||
},
|
||||
}
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// check process status daemon
|
||||
/// # Fn `run_daemons`
|
||||
/// ## func to async exec subjobs of checking process, services and files states
|
||||
///
|
||||
/// *input* : `Arc<TrackingProcess>`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||
///
|
||||
/// *output* : ()
|
||||
///
|
||||
/// *initiator* : fn `run_daemons`
|
||||
///
|
||||
/// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers
|
||||
///
|
||||
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
|
||||
///
|
||||
// pub async fn running_handler(
|
||||
// prc: Arc<TrackingProcess>,
|
||||
// tx: Arc<mpsc::Sender<u8>>,
|
||||
// watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
||||
// ) {
|
||||
// // services and files check (once)
|
||||
// let files_check = file_handler(
|
||||
// &prc.name,
|
||||
// &prc.dependencies.files,
|
||||
// tx.clone(),
|
||||
// watchers.clone(),
|
||||
// );
|
||||
// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
|
||||
|
||||
// let res = join!(files_check, services_check);
|
||||
// // if inactive -> spawn checks -> active is true
|
||||
// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
||||
// if start_process(&prc.name, &prc.path).await.is_err() {
|
||||
// tx.send(3).await.unwrap();
|
||||
// return;
|
||||
// }
|
||||
// }
|
||||
// // if frozen -> spawn checks -> unfreeze is true
|
||||
// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
||||
// tx.send(10).await.unwrap();
|
||||
// return;
|
||||
// }
|
||||
// // tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
// tokio::task::yield_now().await;
|
||||
// }
|
||||
|
||||
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
|
||||
/// # Fn `get_container_id`
|
||||
/// ## for getting container id used in logs
|
||||
|
|
@ -428,7 +196,7 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
|
|||
/// *depends on* : -
|
||||
///
|
||||
pub fn get_container_id() -> Option<String> {
|
||||
match Command::new(GET_ID_CMD).output() {
|
||||
match Command::new(*GET_ID_CMD).output() {
|
||||
Ok(output) => {
|
||||
if !output.status.success() {
|
||||
return None;
|
||||
|
|
|
|||
|
|
@ -1,16 +1,12 @@
|
|||
use crate::options::structs::{CustomError, Files};
|
||||
use super::prcs::{is_active, is_frozen};
|
||||
use inotify::{EventMask, Inotify, WatchMask};
|
||||
use std::borrow::BorrowMut;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Sender as Sender;
|
||||
use tokio::time::Duration;
|
||||
use crate::options::structs::Events;
|
||||
use async_trait::async_trait;
|
||||
use crate::options::structs::CustomError;
|
||||
use inotify::{EventMask, Inotify, WatchMask};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::Sender as Sender;
|
||||
use crate::options::structs::Events;
|
||||
use async_trait::async_trait;
|
||||
|
||||
pub mod v2 {
|
||||
pub mod v2 {
|
||||
use log::{error, info, warn};
|
||||
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
|
||||
use super::*;
|
||||
|
|
@ -42,6 +38,7 @@
|
|||
}
|
||||
|
||||
impl FilesController {
|
||||
#[inline(always)]
|
||||
pub fn new(name: &str, triggers: EventHandlers) -> FilesController {
|
||||
let name: Arc<str> = Arc::from(name);
|
||||
Self {
|
||||
|
|
@ -53,6 +50,7 @@
|
|||
code_name : name.clone(),
|
||||
}
|
||||
}
|
||||
#[inline(always)]
|
||||
pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController> {
|
||||
self.path = path.as_ref().to_string_lossy().into_owned();
|
||||
self.watcher = {
|
||||
|
|
@ -90,26 +88,23 @@
|
|||
#[async_trait]
|
||||
impl ProcessUnit for FilesController {
|
||||
async fn process(&mut self) {
|
||||
// polling file check
|
||||
// 1) existing check
|
||||
// dbg!(&self);
|
||||
if let Ok(_) = check_file(&self.name, &self.path).await {
|
||||
if let FileState::NotFound = self.state {
|
||||
info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name);
|
||||
self.state = FileState::Ok;
|
||||
// reseting negative outcome in prc
|
||||
self.trigger_on(None).await;
|
||||
}
|
||||
match &mut self.watcher {
|
||||
Some(notify) => {
|
||||
let mut buffer = [0; 1024];
|
||||
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
|
||||
// notif_events.into_iter().for_each(|mask| {dbg!(&mask.mask);});
|
||||
// todo!();
|
||||
if let (recreate_watcher, true) = (
|
||||
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
|
||||
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
|
||||
) {
|
||||
let mut buffer = [0; 128];
|
||||
if let Ok(notif_events) = notify.read_events(&mut buffer) {
|
||||
let (need_to_recreate, was_modifired) = notif_events.fold((false, false), |(a, b), mask| {
|
||||
(
|
||||
a || mask.mask == EventMask::DELETE_SELF,
|
||||
b || mask.mask == EventMask::MODIFY,
|
||||
)
|
||||
});
|
||||
if let (recreate_watcher, true) = (need_to_recreate, was_modifired) {
|
||||
warn!("File {} ({}) was changed", self.name, &self.path);
|
||||
if recreate_watcher {
|
||||
self.watcher = match create_watcher(&self.name, &self.path) {
|
||||
|
|
@ -140,10 +135,9 @@
|
|||
return;
|
||||
}
|
||||
self.trigger_on(None).await;
|
||||
// 2) change check
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `create_watcher`
|
||||
/// ## for creating watcher on file's delete | update events
|
||||
|
|
@ -165,105 +159,6 @@
|
|||
Ok(inotify)
|
||||
}
|
||||
|
||||
/// # Fn `create_watcher`
|
||||
/// ## for managing processes by checking dep files' states
|
||||
///
|
||||
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||
///
|
||||
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
|
||||
///
|
||||
/// *initiator* : fn `utils::running_handler`
|
||||
///
|
||||
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||
///
|
||||
/// *depends on* : Files
|
||||
///
|
||||
pub async fn file_handler(
|
||||
name: &str,
|
||||
files: &[Files],
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
for (i, file) in files.iter().enumerate() {
|
||||
// let src = format!("{}{}", file.src, file.filename);
|
||||
if check_file(&file.filename, &file.src).await.is_err() {
|
||||
if !is_active(name).await || is_frozen(name).await {
|
||||
return Err(anyhow::Error::msg("Process is frozen or stopped"));
|
||||
}
|
||||
match file.triggers.on_delete.as_str() {
|
||||
"stay" => {
|
||||
tx.send(9).await.unwrap();
|
||||
continue;
|
||||
}
|
||||
"stop" => {
|
||||
if is_active(name).await {
|
||||
tx.send(1).await.unwrap();
|
||||
}
|
||||
return Err(anyhow::Error::msg("Process was stopped"));
|
||||
}
|
||||
"hold" => {
|
||||
if is_active(name).await {
|
||||
tx.send(2).await.unwrap();
|
||||
return Err(anyhow::Error::msg("Process was frozen"));
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
tx.send(101).await.unwrap();
|
||||
return Err(anyhow::Error::msg("Impermissible character or word in file trigger"));
|
||||
}
|
||||
}
|
||||
} else if is_active(name).await && !is_frozen(name).await {
|
||||
let watchers = watchers.clone();
|
||||
// println!("mutex: {:?}", watchers);
|
||||
let mut buffer = [0; 128];
|
||||
let mut mutex_guard = watchers.lock().await;
|
||||
if let Some(notify) = mutex_guard.get_mut(i) {
|
||||
let events = notify.read_events(&mut buffer);
|
||||
// println!("{:?}", events);
|
||||
if events.is_ok() {
|
||||
let events: Vec<EventMask> = events
|
||||
.unwrap()
|
||||
.map(|mask| mask.mask)
|
||||
.filter(|mask| {
|
||||
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
||||
})
|
||||
.collect();
|
||||
for event in events {
|
||||
if let EventMask::DELETE_SELF = event {
|
||||
// ! warning (DELETE_SELF event) !
|
||||
// println!("! warning (DELETE_SELF event) !");
|
||||
// * watcher recreation after dealing with file recreation mechanism in text editors
|
||||
let mutex = notify.borrow_mut();
|
||||
|
||||
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
||||
if let Ok(watcher) = create_watcher(&file.filename, &file.src) {
|
||||
*mutex = watcher;
|
||||
}
|
||||
}
|
||||
match file.triggers.on_change.as_str() {
|
||||
"stop" => {
|
||||
let _ = tx.send(7).await;
|
||||
}
|
||||
"restart" => {
|
||||
let _ = tx.send(8).await;
|
||||
}
|
||||
"stay" => {
|
||||
let _ = tx.send(9).await;
|
||||
}
|
||||
_ => {
|
||||
let _ = tx.send(101).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// # Fn `check_file`
|
||||
/// ## for checking existance of current file
|
||||
///
|
||||
|
|
|
|||
|
|
@ -2,18 +2,136 @@
|
|||
// cpu load, ram/rom load and net activity
|
||||
|
||||
// use std::sync::Mutex;
|
||||
use std::sync::Arc;
|
||||
use crate::options::structs::TrackingProcess;
|
||||
use sysinfo::{Process, System};
|
||||
use tokio::join;
|
||||
use crate::options::structs::{ProcessMetrics, ContainerMetrics};
|
||||
use super::get_container_id;
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
use crate::options::structs::{ProcessState, TrackingProcess};
|
||||
use sysinfo::{System, Disks as DisksList, Networks};
|
||||
use crate::options::structs::Dependencies;
|
||||
use serde::Serialize;
|
||||
use super::prcs::v2::Pid;
|
||||
// use pcap::{Device, Capture, Active};
|
||||
// use std::net::Ipv4Addr;
|
||||
// use anyhow::{Result, Ok};
|
||||
|
||||
// type PacketBuffer = Arc<Mutex<Vec<PacketInfo>>>;
|
||||
|
||||
type CoreUsage = BTreeMap<usize, CoreInfo>;
|
||||
type Disks = Vec<Disk>;
|
||||
type Ifaces = Vec<Network>;
|
||||
pub type MetricProcesses = Vec<ProcessesExtended>;
|
||||
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct FullMetrics {
|
||||
hostname : String,
|
||||
os : String,
|
||||
kernel : String,
|
||||
cpu : Cpu,
|
||||
ram : Ram,
|
||||
disks : Disks,
|
||||
networks : Ifaces,
|
||||
processes : MetricProcesses,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct HostInfo {
|
||||
hostname : String,
|
||||
os : String,
|
||||
kernel : String,
|
||||
}
|
||||
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct Cpu {
|
||||
global_usage : f32,
|
||||
usage : CoreUsage,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct CoreInfo {
|
||||
name: String,
|
||||
brand : String,
|
||||
frequency : u64,
|
||||
vendor_id : String,
|
||||
usage : f32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct Ram {
|
||||
free_mem : u64,
|
||||
free_swap : u64,
|
||||
total_mem : u64,
|
||||
total_swap : u64
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct Disk {
|
||||
name : String,
|
||||
kind : String,
|
||||
fs : String,
|
||||
mount_point : String,
|
||||
total_space : u64,
|
||||
available_space : u64,
|
||||
is_removable : bool,
|
||||
is_readonly : bool,
|
||||
}
|
||||
|
||||
// vec<Network>
|
||||
#[derive(Serialize, Debug)]
|
||||
struct Network {
|
||||
iname : String,
|
||||
mac : String,
|
||||
recieved : u64,
|
||||
transmitted : u64,
|
||||
total_recieved_bytes : u64,
|
||||
total_transmitted_bytes : u64,
|
||||
total_recieved_packets : u64,
|
||||
total_transmitted_packets : u64,
|
||||
errors_on_recieved : u64,
|
||||
errors_on_transmitted : u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
pub struct ProcessesExtended {
|
||||
name : String,
|
||||
status : String,
|
||||
pid : Pid,
|
||||
dependencies : Dependencies,
|
||||
cpu_usage : f32,
|
||||
ram_usage : f32,
|
||||
virtual_mem_usage : u64,
|
||||
disks_usage_read_bytes: u64,
|
||||
disks_usage_write_bytes: u64,
|
||||
}
|
||||
|
||||
impl ProcessesExtended {
|
||||
pub fn from_old_with_params(
|
||||
old : Arc<TrackingProcess>,
|
||||
pid : Pid,
|
||||
status : ProcessState,
|
||||
) -> Self {
|
||||
Self {
|
||||
name : old.name.clone(),
|
||||
status : status.to_string(),
|
||||
pid,
|
||||
dependencies : old.dependencies.clone(),
|
||||
cpu_usage : 0.0,
|
||||
ram_usage : 0.0,
|
||||
virtual_mem_usage : 0,
|
||||
disks_usage_read_bytes: 0,
|
||||
disks_usage_write_bytes: 0,
|
||||
}
|
||||
}
|
||||
fn add_metrics(&mut self, system : &mut System) {
|
||||
if let Some(prc) = system.process(self.pid.new_sysinfo_pid()) {
|
||||
self.cpu_usage = prc.cpu_usage() / system.cpus().len() as f32;
|
||||
self.ram_usage = (system.total_memory() as f32) / (prc.memory() as f32);
|
||||
self.disks_usage_read_bytes = prc.disk_usage().total_read_bytes;
|
||||
self.disks_usage_write_bytes = prc.disk_usage().total_written_bytes;
|
||||
self.virtual_mem_usage = prc.virtual_memory();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `init_metrics_grubber`
|
||||
/// ## for initializing process of unstoppable grubbing metrics.
|
||||
///
|
||||
|
|
@ -28,204 +146,117 @@ use super::get_container_id;
|
|||
/// *depends on* : -
|
||||
///
|
||||
#[allow(dead_code)]
|
||||
pub async fn init_metrics_grubber() {
|
||||
pub async fn init_metrics_grubber(
|
||||
/* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */
|
||||
) {
|
||||
let mut system = System::new();
|
||||
// let mut buffer: Vec<PacketInfo> = vec![];
|
||||
// let shared_buf: PacketBuffer = Arc::new(Mutex::new(buffer));
|
||||
get_all_metrics(&mut system).await;
|
||||
}
|
||||
|
||||
async fn get_all_metrics(system: &mut System) {
|
||||
system.refresh_all();
|
||||
// let temp = String::from_utf8(get_pid("systemd").await.unwrap().stdout).unwrap();
|
||||
// let prc = system.process(Pid::from_str(&temp).unwrap()).unwrap();
|
||||
// prc.
|
||||
// let _ = capture_packets(shared_buf.clone()).await;
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||
dbg!(get_host_info().await);
|
||||
dbg!(get_cpu_metrics(system).await);
|
||||
dbg!(get_ram_metrics(system).await);
|
||||
dbg!(get_all_disks_metrics().await);
|
||||
dbg!(get_all_ifaces_metrics().await);
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[allow(unused_variables)]
|
||||
async fn gather_metrics(proc: Arc<Process>) {
|
||||
|
||||
async fn get_host_info() -> HostInfo {
|
||||
HostInfo {
|
||||
hostname : System::host_name().unwrap_or_default(),
|
||||
os : System::long_os_version().unwrap_or_default(),
|
||||
kernel : System::kernel_version().unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
|
||||
// DEPRECATED : for net monitoring
|
||||
// async fn capture_packets(buffer: PacketBuffer) -> Result<()> {
|
||||
// let mut cap = Capture::from_device(Device::lookup()?.unwrap())?
|
||||
// .promisc(true)
|
||||
// .open()?;
|
||||
async fn get_cpu_metrics(system: &mut System) -> Cpu {
|
||||
system.refresh_cpu_all();
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||
|
||||
// cap.filter("not broadcast and not multicast", true)?;
|
||||
let mut buffer = CoreUsage::new();
|
||||
let global_usage = system.global_cpu_usage();
|
||||
|
||||
// while let core::result::Result::Ok(packet) = cap.next_packet() {
|
||||
// if let Some((src, dst, prot)) = get_packet_info(&packet.data).await {
|
||||
// let packet_info = PacketInfo::new(String::from(prot), dst, src, packet.header.len as usize);
|
||||
// let mut locked_buffer = buffer.lock().unwrap();
|
||||
// println!("{:?}", &packet_info);
|
||||
// locked_buffer.push(packet_info);
|
||||
// }
|
||||
// }
|
||||
// Ok(())
|
||||
// }
|
||||
// async fn get_packet_info(data: &[u8]) -> Option<(Ipv4Addr, Ipv4Addr, &str)> {
|
||||
// if data.len() >= 20 {
|
||||
// let src_ip = Ipv4Addr::new(data[12], data[13], data[14], data[15]);
|
||||
// let dst_ip = Ipv4Addr::new(data[16], data[17], data[18], data[19]);
|
||||
// let protocol = match data[9] {
|
||||
// 1 => "ICMP",
|
||||
// 6 => "TCP",
|
||||
// 17 => "UDP",
|
||||
// _ => "Unknown",
|
||||
// };
|
||||
|
||||
// Some((src_ip, dst_ip, protocol))
|
||||
// } else {
|
||||
// None
|
||||
// }
|
||||
// }
|
||||
system.cpus()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.for_each(|(id, cpu)| {
|
||||
let core_info = CoreInfo {
|
||||
// id,
|
||||
brand : cpu.brand().to_string(),
|
||||
name : cpu.name().to_string(),
|
||||
frequency : cpu.frequency(),
|
||||
vendor_id : cpu.vendor_id().to_string(),
|
||||
usage : cpu.cpu_usage(),
|
||||
};
|
||||
// buffer.push(core_info);
|
||||
buffer.entry(id).or_insert(core_info);
|
||||
});
|
||||
|
||||
|
||||
/// # Fn `get_all_container_metrics`
|
||||
/// ## for gathering all container (whole system metrics)
|
||||
///
|
||||
/// *input* : `Arc<System>`, `Arc<Vec<TrackingProcess>>`
|
||||
///
|
||||
/// *output* : `ContainerMetrics`
|
||||
///
|
||||
/// *initiator* : main thread ??
|
||||
///
|
||||
/// *managing* : ref counter to `System` object, ref counter to list of processes
|
||||
///
|
||||
/// *depends on* : `TrackingProcess`
|
||||
///
|
||||
#[allow(dead_code)]
|
||||
async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProcess>>) -> ContainerMetrics {
|
||||
let metrics = join!(
|
||||
get_cpu_metrics_container(sys.clone()),
|
||||
get_ram_metrics_container(sys.clone()),
|
||||
get_subsystems(prcs.clone())
|
||||
);
|
||||
ContainerMetrics::new(
|
||||
&get_container_id().unwrap_or(String::from("unknown")),
|
||||
metrics.0,
|
||||
metrics.1,
|
||||
metrics.2
|
||||
)
|
||||
Cpu {
|
||||
global_usage,
|
||||
usage: buffer
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `get_cpu_metrics_container`
|
||||
/// ## for gathering container cpu metrics
|
||||
///
|
||||
/// *input* : `Arc<System>`
|
||||
///
|
||||
/// *output* : `f32`
|
||||
///
|
||||
/// *initiator* : main thread ??
|
||||
///
|
||||
/// *managing* : ref counter to `System` object
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
#[allow(dead_code)]
|
||||
async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 {
|
||||
sys.global_cpu_usage()
|
||||
async fn get_ram_metrics(system: &mut System) -> Ram {
|
||||
system.refresh_memory();
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||
|
||||
Ram {
|
||||
free_mem : system.free_memory(),
|
||||
free_swap : system.free_swap(),
|
||||
total_mem : system.total_memory(),
|
||||
total_swap : system.total_swap(),
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `get_ram_metrics_container`
|
||||
/// ## for gathering container ram metrics
|
||||
///
|
||||
/// *input* : `Arc<System>`
|
||||
///
|
||||
/// *output* : `f32`
|
||||
///
|
||||
/// *initiator* : main thread ??
|
||||
///
|
||||
/// *managing* : ref counter to `System` object
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
#[allow(dead_code)]
|
||||
async fn get_ram_metrics_container(sys: Arc<System>) -> f32 {
|
||||
(sys.used_memory() / sys.total_memory()) as f32 * 100.0
|
||||
}
|
||||
// async fn get_mem_metrics_container(sys: Arc<System>) -> f32 {
|
||||
// sys.
|
||||
// }
|
||||
|
||||
/// # Fn `get_subsystems`
|
||||
/// ## for gathering info about container subsystems (processes)
|
||||
///
|
||||
/// *input* : `Arc<Vec<TrackingProcess>>`
|
||||
///
|
||||
/// *output* : `Vec<String>`
|
||||
///
|
||||
/// *initiator* : main thread ??
|
||||
///
|
||||
/// *managing* : ref counter to list of `TrackingProcess`
|
||||
///
|
||||
/// *depends on* : `TrackingProcess`
|
||||
///
|
||||
#[allow(dead_code)]
|
||||
async fn get_subsystems(prcs: Arc<Vec<TrackingProcess>>) -> Vec<String> {
|
||||
prcs.iter().map(|process| process.name.clone()).collect()
|
||||
async fn get_all_disks_metrics() -> Disks {
|
||||
let disks = DisksList::new_with_refreshed_list();
|
||||
let mut buffer = Disks::new();
|
||||
disks.list()
|
||||
.iter()
|
||||
.for_each(|disk| {
|
||||
let disk = Disk {
|
||||
name : disk.name().to_string_lossy().into_owned(),
|
||||
kind: disk.kind().to_string(),
|
||||
fs : disk.file_system().to_string_lossy().into_owned(),
|
||||
mount_point : disk.mount_point().to_string_lossy().into_owned(),
|
||||
total_space : disk.total_space(),
|
||||
available_space : disk.available_space(),
|
||||
is_removable : disk.is_removable(),
|
||||
is_readonly : disk.is_read_only()
|
||||
};
|
||||
buffer.push(disk);
|
||||
});
|
||||
buffer
|
||||
}
|
||||
|
||||
/// # Fn `get_all_metrics_process`
|
||||
/// ## for gathering all process' metrics
|
||||
///
|
||||
/// *input* : `Arc<Process>`, `Arc<System>`
|
||||
///
|
||||
/// *output* : `ProcessMetrics`
|
||||
///
|
||||
/// *initiator* : main thread ??
|
||||
///
|
||||
/// *managing* : two ref counters to `Process` and `System`
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
#[allow(dead_code)]
|
||||
async fn get_all_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> ProcessMetrics {
|
||||
let metrics = join!(
|
||||
get_cpu_metrics_process(proc.clone()),
|
||||
get_ram_metrics_process(proc.clone(), sys.clone())
|
||||
);
|
||||
ProcessMetrics::new(
|
||||
proc.name().to_str().unwrap_or("unknown"),
|
||||
metrics.0,
|
||||
metrics.1
|
||||
)
|
||||
async fn get_all_ifaces_metrics() -> Ifaces {
|
||||
let mut ifaces = Ifaces::new();
|
||||
let networks = Networks::new_with_refreshed_list();
|
||||
networks.iter()
|
||||
.for_each(|(iface_name, data)| {
|
||||
let mac = data.mac_address().to_string();
|
||||
let iface = Network {
|
||||
iname : iface_name.to_owned(),
|
||||
mac : mac,
|
||||
recieved : data.received(),
|
||||
transmitted : data.transmitted(),
|
||||
total_recieved_bytes : data.total_received(),
|
||||
total_transmitted_bytes : data.total_transmitted(),
|
||||
total_recieved_packets : data.total_packets_received(),
|
||||
total_transmitted_packets : data.total_packets_transmitted(),
|
||||
errors_on_recieved : data.errors_on_received(),
|
||||
errors_on_transmitted : data.errors_on_transmitted(),
|
||||
};
|
||||
ifaces.push(iface);
|
||||
});
|
||||
ifaces
|
||||
}
|
||||
|
||||
/// # Fn `get_cpu_metrics_process`
|
||||
/// ## for gathering process cpu metrics
|
||||
///
|
||||
/// *input* : `Arc<Process>`
|
||||
///
|
||||
/// *output* : `f32`
|
||||
///
|
||||
/// *initiator* : main thread ??
|
||||
///
|
||||
/// *managing* : ref counter to `Process` object
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
async fn get_cpu_metrics_process(proc: Arc<Process>) -> f32 {
|
||||
proc.cpu_usage()
|
||||
}
|
||||
|
||||
/// # Fn `get_ram_metrics_process`
|
||||
/// ## for gathering process ram metrics
|
||||
///
|
||||
/// *input* : `Arc<Process>`
|
||||
///
|
||||
/// *output* : `f32`
|
||||
///
|
||||
/// *initiator* : main thread ??
|
||||
///
|
||||
/// *managing* : ref counter to `Process` object
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
async fn get_ram_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> f32 {
|
||||
(proc.memory() as f64 / sys.total_memory() as f64) as f32 * 100.0 as f32
|
||||
}
|
||||
async fn get_all_processes_metrics(system: &mut System) {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod metrics_unittets {
|
||||
|
|
|
|||
|
|
@ -6,17 +6,39 @@ use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUni
|
|||
use std::collections::HashSet;
|
||||
use tokio::sync::mpsc::Receiver as MpscReciever;
|
||||
use async_trait::async_trait;
|
||||
use serde::Serialize;
|
||||
|
||||
pub mod v2 {
|
||||
use log::info;
|
||||
use tokio::time::sleep;
|
||||
use crate::options::structs::DependencyType;
|
||||
use std::path::Path;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Serialize, Clone, Copy)]
|
||||
pub struct Pid(u32);
|
||||
|
||||
impl std::fmt::Display for Pid {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
return write!(f, "{}", self.0);
|
||||
}
|
||||
}
|
||||
|
||||
impl Pid {
|
||||
fn new() -> Self {
|
||||
Pid(0)
|
||||
}
|
||||
#[allow(unused)]
|
||||
pub fn new_sysinfo_pid(&self) -> sysinfo::Pid {
|
||||
sysinfo::Pid::from_u32(self.0 as u32)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ProcessesController {
|
||||
name: Arc<str>,
|
||||
pub name: Arc<str>,
|
||||
pub pid : Pid,
|
||||
bin: String,
|
||||
// obj: Arc<TrackingProcess>,
|
||||
state: ProcessState,
|
||||
|
|
@ -31,19 +53,26 @@ pub mod v2 {
|
|||
}
|
||||
|
||||
impl ProcessesController {
|
||||
#[inline(always)]
|
||||
pub fn new(name: &str, event_reader: MpscReciever<Events>) -> ProcessesController {
|
||||
ProcessesController {
|
||||
name : Arc::from(name),
|
||||
pid : Pid::new(),
|
||||
bin : String::new(),
|
||||
state : ProcessState::Stopped,
|
||||
event_reader,
|
||||
negative_events : HashSet::new(),
|
||||
}
|
||||
}
|
||||
#[inline(always)]
|
||||
pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController {
|
||||
self.bin = bin.as_ref().to_string_lossy().into_owned();
|
||||
self
|
||||
}
|
||||
#[allow(unused)]
|
||||
pub fn get_pid(&self) -> Pid {
|
||||
self.pid
|
||||
}
|
||||
|
||||
async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) {
|
||||
match trigger {
|
||||
|
|
@ -53,46 +82,158 @@ pub mod v2 {
|
|||
"stop" => {
|
||||
if is_active(&self.name).await {
|
||||
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
|
||||
terminate_process(&self.name).await;
|
||||
self.state = ProcessState::Stopped;
|
||||
match terminate_process(&self.name).await {
|
||||
Ok(_) => {
|
||||
info!("Process {} was stopped ...", &self.name);
|
||||
self.state = ProcessState::Stopped;
|
||||
self.pid = Pid::new();
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Cannot stop process {} : {}", self.name, er);
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
"user-stop" => {
|
||||
if is_active(&self.name).await {
|
||||
info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name);
|
||||
match terminate_process(&self.name).await {
|
||||
Ok(_) => {
|
||||
info!("Process {} was forcefully stopped ...", &self.name);
|
||||
self.state = ProcessState::StoppedByCli;
|
||||
self.pid = Pid::new();
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Cannot forcefully stop process {} : {}", self.name, er);
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
"user-hold" => {
|
||||
if is_active(&self.name).await {
|
||||
info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Hold Call", self.name);
|
||||
match freeze_process(&self.name).await {
|
||||
Ok(_) => {
|
||||
info!("Process {} was forcefully frozen ...", &self.name);
|
||||
self.state = ProcessState::HoldingByCli;
|
||||
// self.pid = Pid::new();
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Cannot forcefully freeze process {} : {}", self.name, er);
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
"hold" => {
|
||||
if !is_frozen(&self.name).await {
|
||||
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
|
||||
freeze_process(&self.name).await;
|
||||
self.state = ProcessState::Holding;
|
||||
match freeze_process(&self.name).await {
|
||||
Ok(_) => {
|
||||
info!("Process {} was frozen ...", &self.name);
|
||||
self.state = ProcessState::Holding;
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Cannot freeze process {} : {}", self.name, er);
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
"restart" => {
|
||||
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
|
||||
let _ = restart_process(&self.name, &self.bin).await;
|
||||
let pid = restart_process(&self.name, &self.bin).await;
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
if let Ok(pid) = pid {
|
||||
self.pid = Pid(pid);
|
||||
info!("{}: New PID - {}", self.name, self.pid);
|
||||
}
|
||||
},
|
||||
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
|
||||
}
|
||||
tokio::time::sleep(Duration::from_micros(100)).await;
|
||||
}
|
||||
#[allow(unused)]
|
||||
pub async fn stop_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||
terminate_process(&self.name).await?;
|
||||
self.state = ProcessState::StoppedByCli;
|
||||
self.pid = Pid::new();
|
||||
Ok(())
|
||||
}
|
||||
#[allow(unused)]
|
||||
pub async fn freeze_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||
freeze_process(&self.name).await?;
|
||||
self.state = ProcessState::HoldingByCli;
|
||||
Ok(())
|
||||
}
|
||||
#[allow(unused)]
|
||||
pub async fn start_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||
let pid = start_process(&self.name, &self.bin).await?;
|
||||
self.state = ProcessState::Pending;
|
||||
self.pid = Pid(pid);
|
||||
Ok(())
|
||||
}
|
||||
#[allow(unused)]
|
||||
pub async fn unfreeze_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||
unfreeze_process(&self.name).await?;
|
||||
self.state = ProcessState::Pending;
|
||||
Ok(())
|
||||
}
|
||||
#[allow(unused)]
|
||||
pub async fn restart_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||
let pid = restart_process(&self.name, &self.bin).await?;
|
||||
self.pid = Pid(pid);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProcessUnit for ProcessesController {
|
||||
async fn process(&mut self) {
|
||||
if self.negative_events.len() == 0 {
|
||||
match self.state {
|
||||
ProcessState::Holding => {
|
||||
let conditions = (is_active(&self.name).await, is_frozen(&self.name).await);
|
||||
let state = &self.state;
|
||||
match (state, conditions) {
|
||||
(ProcessState::Holding, (_, _)) => {
|
||||
info!("No negative dependecies events on {} frozen process. Unfreezing ...", self.name);
|
||||
if let Err(er) = unfreeze_process(&self.name).await {
|
||||
error!("Cannot unfreeze process {} : {}", self.name, er);
|
||||
} else {
|
||||
self.state = ProcessState::Pending;
|
||||
info!("Process {} was unfreezed", &self.name);
|
||||
}
|
||||
},
|
||||
(ProcessState::Stopped, (_, _)) => {
|
||||
info!("No negative dependecies events on stopped {} process. Starting ...", self.name);
|
||||
match start_process(&self.name, &self.bin).await {
|
||||
Ok(pid) => {
|
||||
self.state = ProcessState::Pending;
|
||||
self.pid = Pid(pid);
|
||||
info!("{}: New PID - {}", self.name, self.pid);
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Cannot start process {} : {}", self.name, er);
|
||||
},
|
||||
}
|
||||
},
|
||||
(ProcessState::Pending, (false, false)) => {
|
||||
info!("{} process was impermissibly stopped. Starting ...", self.name);
|
||||
match start_process(&self.name, &self.bin).await {
|
||||
Ok(pid) => {
|
||||
self.state = ProcessState::Pending;
|
||||
self.pid = Pid(pid);
|
||||
info!("{}: New PID - {}", self.name, self.pid);
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Cannot start process {} : {}", self.name, er);
|
||||
},
|
||||
}
|
||||
},
|
||||
(ProcessState::Pending, (true, true)) => {
|
||||
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
|
||||
if let Err(er) = unfreeze_process(&self.name).await {
|
||||
error!("Cannot unfreeze process {} : {}", self.name, er);
|
||||
} else {
|
||||
self.state = ProcessState::Pending;
|
||||
}
|
||||
},
|
||||
ProcessState::Stopped => {
|
||||
info!("No negative dependecies events on {} process. Starting ...", self.name);
|
||||
if let Err(er) = start_process(&self.name, &self.bin).await {
|
||||
error!("Cannot start process {} : {}", self.name, er);
|
||||
} else {
|
||||
self.state = ProcessState::Pending;
|
||||
info!("Process {} was unfreezed", &self.name);
|
||||
}
|
||||
},
|
||||
_ => {},
|
||||
|
|
@ -240,14 +381,11 @@ pub async fn is_frozen(name: &str) -> bool {
|
|||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
pub async fn terminate_process(name: &str) {
|
||||
pub async fn terminate_process(name: &str) -> anyhow::Result<()> {
|
||||
let _ = Command::new("pkill")
|
||||
.arg(name)
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to execute command 'pkill'");
|
||||
std::process::exit(101);
|
||||
});
|
||||
.arg(name)
|
||||
.output()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// # Fn `terminate_process`
|
||||
|
|
@ -263,14 +401,11 @@ pub async fn terminate_process(name: &str) {
|
|||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
pub async fn freeze_process(name: &str) {
|
||||
pub async fn freeze_process(name: &str) -> anyhow::Result<()> {
|
||||
let _ = Command::new("pkill")
|
||||
.args(["-STOP", name])
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to freeze process");
|
||||
std::process::exit(101);
|
||||
});
|
||||
.output()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// # Fn `unfreeze_process`
|
||||
|
|
@ -306,8 +441,8 @@ pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> {
|
|||
///
|
||||
/// *depends on* : fn `start_process`, fn `terminate_process`
|
||||
///
|
||||
pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> {
|
||||
terminate_process(name).await;
|
||||
pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<u32> {
|
||||
terminate_process(name).await?;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
start_process(name, path).await
|
||||
}
|
||||
|
|
@ -325,18 +460,19 @@ pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> {
|
|||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> {
|
||||
pub async fn start_process(name: &str, path: &str) -> anyhow::Result<u32> {
|
||||
// let runsh = format!("{} {}", "exec", path);
|
||||
let mut command = Command::new(path);
|
||||
// command.arg(path);
|
||||
|
||||
match command.spawn() {
|
||||
Ok(_) => {
|
||||
Ok(child) => {
|
||||
let pid = child.id();
|
||||
warn!("Process {} is running now!", name);
|
||||
Ok(())
|
||||
Ok(pid)
|
||||
}
|
||||
Err(er) => {
|
||||
Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er)))
|
||||
Err(anyhow::Error::msg(format!("Cannot start process {} : {}", name, er)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -384,6 +520,7 @@ mod process_unittests {
|
|||
let res1 = start_process("freeze-check", "./tests/examples/freeze-check").await;
|
||||
assert!(res1.is_ok());
|
||||
assert!(!is_frozen("freeze-check").await);
|
||||
let _ = terminate_process("freeze-check").await;
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn pidof_active_process() {
|
||||
|
|
|
|||
|
|
@ -1,12 +1,14 @@
|
|||
use crate::options::structs::CustomError;
|
||||
use log::{error, warn};
|
||||
use std::net::{TcpStream, ToSocketAddrs};
|
||||
use std::net::ToSocketAddrs;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Duration;
|
||||
use tokio::sync::mpsc::Sender as Sender;
|
||||
use async_trait::async_trait;
|
||||
use std::pin::Pin;
|
||||
use futures::future::Future;
|
||||
|
||||
pub mod v2 {
|
||||
use futures::FutureExt;
|
||||
use log::info;
|
||||
|
||||
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
|
||||
|
|
@ -42,6 +44,7 @@ pub mod v2 {
|
|||
}
|
||||
|
||||
impl ServicesController {
|
||||
#[inline(always)]
|
||||
pub fn new() -> ServicesController {
|
||||
ServicesController {
|
||||
name : String::new(),
|
||||
|
|
@ -51,6 +54,7 @@ pub mod v2 {
|
|||
event_registrator : EventHandlers::new(),
|
||||
}
|
||||
}
|
||||
#[inline(always)]
|
||||
pub fn with_access_name(
|
||||
mut self,
|
||||
hostname: &str,
|
||||
|
|
@ -60,7 +64,7 @@ pub mod v2 {
|
|||
self.access_url = Arc::from(access_url);
|
||||
self
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn with_params(
|
||||
mut self,
|
||||
conn_queue: ConnectionQueue,
|
||||
|
|
@ -95,20 +99,52 @@ pub mod v2 {
|
|||
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
|
||||
}
|
||||
async fn check_state(&self) -> anyhow::Result<()> {
|
||||
let mut addrs = self.access_url.to_socket_addrs()?;
|
||||
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
|
||||
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
|
||||
let url = self.access_url.clone();
|
||||
let resolve_future = tokio::task::spawn_blocking(move || {
|
||||
url.to_socket_addrs()
|
||||
});
|
||||
let addrs: Vec<_> = match tokio::time::timeout(Duration::from_secs(1), resolve_future).await {
|
||||
Ok(Ok(addrs)) => addrs?.collect(),
|
||||
Ok(Err(er)) => return Err(er.into()),
|
||||
Err(_) => return Err(anyhow::Error::msg("DNS resolution timeout")),
|
||||
};
|
||||
|
||||
if addrs.is_empty() {
|
||||
return Err(anyhow::Error::msg("No addresses resolved"));
|
||||
}
|
||||
|
||||
let tasks: Vec<_> = addrs.into_iter().map(|addr| async move {
|
||||
match tokio::time::timeout(Duration::from_secs(2), tokio::net::TcpStream::connect(&addr)).await {
|
||||
Ok(Ok(_)) => Some(addr),
|
||||
_ => None,
|
||||
}
|
||||
}).collect();
|
||||
let mut any_success = false;
|
||||
for task in futures::future::join_all(tasks).await {
|
||||
if task.is_some() {
|
||||
any_success = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !any_success {
|
||||
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn trigger_on(&mut self) {
|
||||
match self.state {
|
||||
ServiceState::Ok => {
|
||||
let _ = self.event_registrator
|
||||
.iter()
|
||||
.map(|(_, (_, el))| async {
|
||||
let _ = el.send(Events::Positive(self.access_url.clone())).await;
|
||||
});
|
||||
let futures : Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self.event_registrator.iter()
|
||||
.map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt)))
|
||||
.map(|(prc, (serv, sender_opt))| async move {
|
||||
info!("Notifying process {} ...", prc);
|
||||
let _ = sender_opt.send(Events::Positive(serv.clone())).await;
|
||||
})
|
||||
.map(|fut| fut.boxed())
|
||||
.collect();
|
||||
|
||||
futures::future::join_all(futures).await;
|
||||
},
|
||||
ServiceState::Unavailable => {
|
||||
// looped check and notifying
|
||||
|
|
@ -123,7 +159,6 @@ pub mod v2 {
|
|||
let timer = tokio::time::Instant::now();
|
||||
let mut attempt: u32 = 1;
|
||||
let access_url = Arc::new(self.access_url.clone());
|
||||
// let event_registrator = &mut self.event_registrator;
|
||||
|
||||
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
|
||||
// let access_url = access_url.clone();
|
||||
|
|
@ -137,11 +172,22 @@ pub mod v2 {
|
|||
if state_check_result.is_ok() {
|
||||
info!("Connection to {} is `OK` now", &access_url);
|
||||
self.state = ServiceState::Ok;
|
||||
let futures : Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self.event_registrator.iter()
|
||||
.map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt)))
|
||||
.map(|(prc, (serv, sender_opt))| async move {
|
||||
info!("Notifying process {} ...", prc);
|
||||
let _ = sender_opt.send(Events::Positive(serv.clone()));
|
||||
})
|
||||
.map(|fut| fut.boxed())
|
||||
.collect();
|
||||
|
||||
futures::future::join_all(futures).await;
|
||||
break;
|
||||
} else {
|
||||
let now = timer.elapsed();
|
||||
|
||||
let iterator = self.config.iter()
|
||||
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
|
||||
.filter(|(&wait, _)| tokio::time::Duration::from_secs(wait as u64) <= now)
|
||||
.flat_map(|(_, a)| a.iter().cloned())
|
||||
.collect::<VecDeque<Arc<str>>>();
|
||||
|
||||
|
|
@ -150,7 +196,7 @@ pub mod v2 {
|
|||
info!("Trying to notify process `{}` ...", &proc_name);
|
||||
let sender_opt = self.event_registrator.get(&name)
|
||||
.map(|(trigger, sender)|
|
||||
(trigger.to_service_negative_event(name.clone()), sender)
|
||||
(trigger.to_service_negative_event(self.access_url.clone()), sender)
|
||||
);
|
||||
|
||||
if let Some((tr, tx)) = sender_opt {
|
||||
|
|
@ -178,7 +224,7 @@ pub mod v2 {
|
|||
self.trigger_on().await;
|
||||
},
|
||||
(ServiceState::Ok, Err(_)) => {
|
||||
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
||||
warn!("Unreachable for connection service `{}`. Initializing reconnect mechanism ...", &self.access_url);
|
||||
self.state = ServiceState::Unavailable;
|
||||
self.trigger_on().await;
|
||||
},
|
||||
|
|
@ -189,168 +235,15 @@ pub mod v2 {
|
|||
}
|
||||
}
|
||||
|
||||
/// # Fn `service_handler`
|
||||
/// ## function to realize mechanism of current process' dep services monitoring
|
||||
///
|
||||
/// *input* : `&str`, `&Vec<Services>`, `Arc<mpsc::Sender<u8>>`
|
||||
///
|
||||
/// *output* : ()
|
||||
///
|
||||
/// *initiator* : fn `utils::running_handler`
|
||||
///
|
||||
/// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer
|
||||
///
|
||||
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
|
||||
///
|
||||
// pub async fn service_handler(
|
||||
// name: &str,
|
||||
// services: &Vec<Services>,
|
||||
// tx: Arc<mpsc::Sender<u8>>,
|
||||
// ) -> Result<(), CustomError> {
|
||||
// // println!("service daemon on {}", name);
|
||||
// for serv in services {
|
||||
// if check_service(&serv.hostname, &serv.port).await.is_err() {
|
||||
// if !is_active(name).await || is_frozen(name).await {
|
||||
// return Err(CustomError::Fatal);
|
||||
// }
|
||||
// error!(
|
||||
// "Service {}:{} is unreachable for process {}",
|
||||
// &serv.hostname, &serv.port, &name
|
||||
// );
|
||||
// match serv.triggers.on_lost.as_str() {
|
||||
// "stay" => {
|
||||
// tx.send(4).await.unwrap();
|
||||
// continue;
|
||||
// }
|
||||
// "stop" => {
|
||||
// if looped_service_connecting(name, serv).await.is_err() {
|
||||
// tx.send(5).await.unwrap();
|
||||
// tokio::task::yield_now().await;
|
||||
// return Err(CustomError::Fatal);
|
||||
// }
|
||||
// }
|
||||
// "hold" => {
|
||||
// // if is_frozen(name).await {
|
||||
// // return Err(CustomError::Fatal);
|
||||
// // }
|
||||
// if looped_service_connecting(name, serv).await.is_err() {
|
||||
// tx.send(6).await.unwrap();
|
||||
// tokio::task::yield_now().await;
|
||||
// return Err(CustomError::Fatal);
|
||||
// }
|
||||
// }
|
||||
// _ => {
|
||||
// tx.send(101).await.unwrap();
|
||||
// return Err(CustomError::Fatal);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
/// # Fn `looped_service_connecting`
|
||||
/// ## for service's state check in loop (with delay and restriction of attempts)
|
||||
///
|
||||
/// *input* : `&str`, `&Services`
|
||||
///
|
||||
/// *output* : Ok(()) if service now available | Err(er) if still not
|
||||
///
|
||||
/// *initiator* : fn `service_handler`
|
||||
///
|
||||
/// *managing* : process name, current service struct
|
||||
///
|
||||
/// *depends on* : fn `check_service`
|
||||
///
|
||||
// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
|
||||
// if serv.triggers.wait == 0 {
|
||||
// loop {
|
||||
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
// warn!(
|
||||
// "Attempting to connect from {} process to {}:{}",
|
||||
// &name, &serv.hostname, &serv.port
|
||||
// );
|
||||
// match check_service(&serv.hostname, &serv.port).await {
|
||||
// Ok(_) => {
|
||||
// log::info!(
|
||||
// "Successfully connected to {} from {} process!",
|
||||
// &serv.hostname,
|
||||
// &name
|
||||
// );
|
||||
// break;
|
||||
// }
|
||||
// Err(_) => {
|
||||
// tokio::task::yield_now().await;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Ok(())
|
||||
// } else {
|
||||
// let start = Instant::now();
|
||||
// while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
||||
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
// warn!(
|
||||
// "Attempting to connect from {} process to {}:{}",
|
||||
// &name, &serv.hostname, &serv.port
|
||||
// );
|
||||
// match check_service(&serv.hostname, &serv.port).await {
|
||||
// Ok(_) => {
|
||||
// log::info!(
|
||||
// "Successfully connected to {} from {} process!",
|
||||
// &serv.hostname,
|
||||
// &name
|
||||
// );
|
||||
// return Ok(());
|
||||
// }
|
||||
// Err(_) => {
|
||||
// tokio::task::yield_now().await;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Err(CustomError::Fatal)
|
||||
// }
|
||||
// }
|
||||
|
||||
/// # Fn `check_service`
|
||||
/// ## for check current service's availiability
|
||||
///
|
||||
/// *input* : `&str`, `&u32`
|
||||
///
|
||||
/// *output* : Ok(()) if service now available | Err(er) if still not
|
||||
///
|
||||
/// *initiator* : fn `service_handler`, fn `looped_service_connecting`
|
||||
///
|
||||
/// *managing* : hostname, port
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
// ! have to be rewritten
|
||||
// todo: rewrite use
|
||||
async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> {
|
||||
let addr = format!("{}:{}", hostname, port);
|
||||
|
||||
match addr.to_socket_addrs() {
|
||||
Ok(mut addrs) => {
|
||||
if addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(CustomError::Fatal)
|
||||
}
|
||||
}
|
||||
Err(_) => Err(CustomError::Fatal),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod service_unittests {
|
||||
use super::check_service;
|
||||
#[tokio::test]
|
||||
async fn check_available_service() {
|
||||
assert!(check_service("ya.ru", &443).await.is_ok());
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn check_unavailable_service() {
|
||||
assert!(check_service("unavailable.service", &1111).await.is_err());
|
||||
}
|
||||
// use super::check_service;
|
||||
// #[tokio::test]
|
||||
// async fn check_available_service() {
|
||||
// assert!(check_service("ya.ru", &443).await.is_ok());
|
||||
// }
|
||||
// #[tokio::test]
|
||||
// async fn check_unavailable_service() {
|
||||
// assert!(check_service("unavailable.service", &1111).await.is_err());
|
||||
// }
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue