feature/configv2 #41

Merged
VladislavD merged 44 commits from feature/configv2 into rc 2025-05-12 09:51:07 +03:00
18 changed files with 1726 additions and 554 deletions

1
.gitignore vendored
View File

@ -4,3 +4,4 @@
Cargo.lock Cargo.lock
hagent_test.sock hagent_test.sock
release release
*.sock

View File

@ -1,6 +1,6 @@
[package] [package]
name = "noxis-cli" name = "noxis-cli"
version = "0.2.4" version = "0.2.7"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -2,11 +2,17 @@ use clap::{Parser, Subcommand};
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
pub struct Cli { pub struct Cli {
#[arg(
short,
default_value="noxis-rs.sock",
help="explicit specify of NOXIS Socket file"
)]
pub socket : String,
#[command( #[command(
subcommand, subcommand,
help = "to manage Noxis work", help = "to manage Noxis work",
)] )]
command : Commands, pub command : Commands,
} }
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)] #[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
@ -50,13 +56,13 @@ pub struct StartAction {
num_args = 1.., num_args = 1..,
value_delimiter = ' ' value_delimiter = ' '
)] )]
flags : Vec<String>, pub flags : Vec<String>,
} }
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
pub struct ConfigCommand { pub struct ConfigCommand {
#[command(subcommand)] #[command(subcommand)]
action : ConfigAction, pub action : ConfigAction,
} }
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)] #[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
@ -83,12 +89,12 @@ pub struct LocalConfig {
action, action,
help = "to read following input as JSON", help = "to read following input as JSON",
)] )]
is_json : bool, pub is_json : bool,
// value // value
#[arg( #[arg(
help = "path to config file or config String (with --json flag)", help = "path to config file or config String (with --json flag)",
)] )]
config : String, pub config : String,
} }
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
@ -96,16 +102,16 @@ pub struct ProcessCommand {
#[arg( #[arg(
help = "name of needed process", help = "name of needed process",
)] )]
process : String, pub process : String,
#[command( #[command(
subcommand, subcommand,
help = "To get current process's status", help = "To get current process's status",
)] )]
action : ProcessAction, pub action : ProcessAction,
} }
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)] #[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
enum ProcessAction { pub enum ProcessAction {
#[command( #[command(
about = "To get info about current process status", about = "To get info about current process status",
)] )]

View File

@ -1,14 +1,15 @@
use thiserror::Error; use thiserror::Error;
use super::cli_net::NOXIS_RS_CREDS;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum NoxisCliError { pub enum NoxisCliError {
#[error("Can't send any data to {:?}. Noxis-rs daemon is disabled or can't be accessed", NOXIS_RS_CREDS)] #[error("Can't find socket `{0}`. Error : {1}")]
NoxisDaemonMissing, NoxisDaemonMissing(String, String),
#[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's web-functionality")] #[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")]
PortIsNotWritable, PortIsNotWritable,
#[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")] #[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")]
CliPromptCanNotBeSent, CliPromptCanNotBeSent,
#[error("Can't parse CLI struct and send as byte stream")] #[error("Can't parse CLI struct and send as byte stream")]
ToStringCliParsingParsing, ToStringCliParsingParsing,
#[error("Can't read Noxis response due to {0}")]
CliResponseReadError(String)
} }

View File

@ -1,32 +1,30 @@
use tokio::net::TcpStream; use tokio::net::UnixStream;
use tokio::io::AsyncWriteExt; use tokio::io::{AsyncWriteExt, AsyncReadExt};
use tokio::time::{Duration, sleep}; use tokio::time::{Duration, sleep};
use anyhow::Result; use anyhow::Result;
use super::Cli; use super::Cli;
use super::cli_error::NoxisCliError; use super::cli_error::NoxisCliError;
pub const NOXIS_RS_CREDS: &str = "127.0.0.1:7753"; async fn create_us_stream(cli: &Cli) -> Result<UnixStream> {
Ok(UnixStream::connect(&cli.socket).await.map_err(|er| NoxisCliError::NoxisDaemonMissing((&cli.socket).to_string(), er.to_string()))?)
pub async fn create_tcp_stream() -> Result<TcpStream> {
Ok(TcpStream::connect(NOXIS_RS_CREDS).await.map_err(|_| NoxisCliError::NoxisDaemonMissing)?)
} }
pub async fn try_send(stream: Result<TcpStream>, params: Cli) -> Result<()> { pub async fn try_send(cli: Cli) -> Result<()> {
use serde_json::to_string; // let stream = create_us_stream(&cli).await;
let mut stream = stream.map_err(|_| NoxisCliError::NoxisDaemonMissing)?; let mut stream = create_us_stream(&cli).await?;
loop {
if stream.writable().await.is_err() {
sleep(Duration::from_millis(100)).await;
continue;
}
// let msg: Cli = from_str(&format!("{:?}", params))?;
let msg= to_string(&params).map_err(|_| NoxisCliError::ToStringCliParsingParsing)?;
// let msg = r"HTTP/1.1 POST\r\nContent-Length: 14\r\nContent-Type: text/plain\r\n\r\nHello, World!@";
stream.write_all(msg.as_bytes()).await.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?; let msg = serde_json::to_vec(&cli)
// ... .map_err(|_| NoxisCliError::ToStringCliParsingParsing)?;
break;
} stream.write_all(&msg)
.await
.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
let mut response = [0; 1024];
stream.read(&mut response)
.await
.map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?;
println!("Received response: {}", String::from_utf8_lossy(&response));
Ok(()) Ok(())
} }

View File

@ -4,12 +4,12 @@ mod cli_error;
use clap::Parser; use clap::Parser;
use cli::Cli; use cli::Cli;
use cli_net::{create_tcp_stream, try_send}; use cli_net::try_send;
use anyhow::Result; use anyhow::Result;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()>{ async fn main() -> Result<()>{
let cli = Cli::parse(); let cli = Cli::parse();
try_send(create_tcp_stream().await, cli).await?; try_send(cli).await?;
Ok(()) Ok(())
} }

View File

@ -1,6 +1,6 @@
[package] [package]
name = "noxis-rs" name = "noxis-rs"
version = "0.11.10" version = "0.11.26"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
@ -8,13 +8,15 @@ anyhow = "1.0.93"
chrono = "0.4.38" chrono = "0.4.38"
clap = { version = "4.5.21", features = ["derive"] } clap = { version = "4.5.21", features = ["derive"] }
env_logger = "0.11.3" env_logger = "0.11.3"
inotify = "0.10.2" inotify = "0.11.0"
log = "0.4.22" log = "0.4.22"
pcap = "2.2.0" pcap = "2.2.0"
redis = "0.25.4" redis = "0.29.2"
serde = { version = "1.0.203", features = ["derive"] } serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.118" serde_json = "1.0.118"
sysinfo = "0.32.0" sysinfo = "0.32.0"
tokio = { version = "1.38.0", features = ["full", "time"] } tokio = { version = "1.38.0", features = ["full", "time"] }
noxis-cli = { path = "../noxis-cli" } noxis-cli = { path = "../noxis-cli" }
dotenv = "0.15.0" dotenv = "0.15.0"
futures = "0.3.31"
async-trait = "0.1.88"

View File

@ -1,6 +1,6 @@
{ {
"dateOfCreation": "1721381809104", "dateOfCreation": "1721381809112",
"configServer": "localhost", "configServer": "192.168.2.37",
"processes": [ "processes": [
{ {
"name": "temp-process", "name": "temp-process",
@ -12,7 +12,7 @@
"src": "./tests/examples/", "src": "./tests/examples/",
"triggers": { "triggers": {
"onDelete": "stop", "onDelete": "stop",
"onChange": "stay" "onChange": "restart"
} }
} }
], ],
@ -22,8 +22,7 @@
"port": 443, "port": 443,
"triggers": { "triggers": {
"wait": 10, "wait": 10,
"delay": 2, "onLost": "restart"
"onLost": "hold"
} }
} }
] ]

View File

@ -1,7 +1,6 @@
mod options; mod options;
mod utils; mod utils;
use anyhow::Error;
use clap::Parser; use clap::Parser;
use log::{error, info}; use log::{error, info};
use options::config::*; use options::config::*;
@ -14,84 +13,140 @@ use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use utils::*; use utils::*;
use options::preboot::PrebootParams; use options::preboot::PrebootParams;
use tokio::sync::{broadcast, oneshot};
use options::config::v2::init_config_mechanism;
use utils::v2::init_monitoring;
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()>{ async fn main() -> anyhow::Result<()>{
let preboot = Arc::new(PrebootParams::parse().validate()?); let preboot = Arc::new(PrebootParams::parse().validate()?);
let _ = setup_logger(); let _ = setup_logger();
info!("Runner is configurating..."); info!("Noxis is configurating...");
//
// setting up redis connection \ let (tx_brd, mut rx_brd) = broadcast::channel::<Processes>(1);
// then conf checks to choose the most actual \ // cli <-> config
let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| { let (tx_oneshot, rx_oneshot) = oneshot::channel::<Processes>();
error!("No actual configuration for runner. Stopping...");
std::process::exit(1);
});
info!(
"Current runner configuration: {}",
&processes.date_of_creation
);
info!("Runner is ready. Initializing...");
if processes.processes.is_empty() {
error!("Processes list is null, runner-rs initialization is stopped");
return Err(Error::msg("Empty processes segment in config"));
}
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![]; let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// is in need to send to the signals handler thread
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
for proc in processes.processes.iter() { // initilaizing task for config manipulations
info!( let config_module = tokio::spawn(async move {
"Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", let _ = init_config_mechanism(
proc.name, rx_oneshot,
proc.path, tx_brd,
proc.dependencies.files.len(), preboot.clone()
proc.dependencies.services.len() ).await;
); });
handler.push(config_module);
// creating msg channel
// can or should be executed in new thread // initilaizing task for cli manipulation
let (tx, mut rx) = mpsc::channel::<u8>(1); let cli_module = tokio::spawn(async move {
let proc = Arc::new(proc.clone()); if let Err(er) = init_cli_pipeline().await {
let tx = Arc::new(tx.clone()); error!("CLI pipeline failed due to {}", er)
senders.push(Arc::clone(&tx.clone()));
let event = tokio::spawn(async move {
run_daemons(proc.clone(), tx.clone(), &mut rx).await;
});
handler.push(event);
}
// destructor addition
handler.push(tokio::spawn(async move {
if set_valid_destructor(Arc::new(senders)).await.is_err() {
error!("Linux signals handler creation failed. Terminating main thread...");
return;
} }
});
handler.push(cli_module);
tokio::time::sleep(Duration::from_millis(200)).await; // initilaizing task for deinitializing `Noxis`
info!("End of job. Terminating main thread..."); let ctrlc = tokio::spawn(async move {
if let Err(er) = set_valid_destructor(vec![].into()).await {
error!("Destructor mod failed due to {}", er);
}
std::process::exit(0); std::process::exit(0);
})); });
handler.push(ctrlc);
// remote config update subscription let monitoring = tokio::spawn(async move {
handler.push(tokio::spawn(async move { let config = {
let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await; let mut tick = tokio::time::interval(Duration::from_millis(500));
})); loop {
tick.tick().await;
// cli pipeline break match rx_brd.try_recv() {
handler.push(tokio::spawn(async move { Ok(conf) => conf,
let _ = init_cli_pipeline().await; Err(_) => continue,
})); }
}
};
if let Err(er) = init_monitoring(config).await {
error!("Monitoring mod failed due to {}", er);
}
});
handler.push(monitoring);
for i in handler { for i in handler {
let _ = i.await; let _ = i.await;
} }
// setting up redis connection \
// then conf checks to choose the most actual \
// let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
// error!("No actual configuration for runner. Stopping...");
// std::process::exit(1);
// });
//
// info!(
// "Current runner configuration: {}",
// &processes.date_of_creation
// );
// info!("Runner is ready. Initializing...");
//
// if processes.processes.is_empty() {
// error!("Processes list is null, runner-rs initialization is stopped");
// return Err(Error::msg("Empty processes segment in config"));
// }
// let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// // is in need to send to the signals handler thread
// let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
//
// for proc in processes.processes.iter() {
// info!(
// "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
// proc.name,
// proc.path,
// proc.dependencies.files.len(),
// proc.dependencies.services.len()
// );
//
// // creating msg channel
// // can or should be executed in new thread
// let (tx, mut rx) = mpsc::channel::<u8>(1);
// let proc = Arc::new(proc.clone());
// let tx = Arc::new(tx.clone());
//
// senders.push(Arc::clone(&tx.clone()));
//
// let event = tokio::spawn(async move {
// run_daemons(proc.clone(), tx.clone(), &mut rx).await;
// });
// handler.push(event);
// }
//
// // destructor addition
// handler.push(tokio::spawn(async move {
// if set_valid_destructor(Arc::new(senders)).await.is_err() {
// error!("Linux signals handler creation failed. Terminating main thread...");
// return;
// }
//
// tokio::time::sleep(Duration::from_millis(200)).await;
// info!("End of job. Terminating main thread...");
// std::process::exit(0);
// }));
//
// // remote config update subscription
// handler.push(tokio::spawn(async move {
// let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
// }));
//
// // cli pipeline
// handler.push(tokio::spawn(async move {
// let _ = init_cli_pipeline().await;
// }));
//
// for i in handler {
// let _ = i.await;
// }
Ok(()) Ok(())
} }

View File

@ -1,12 +1,9 @@
use log::{error, info, warn}; use log::{error, info};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{ UnixStream, UnixListener };
use anyhow::{Result as DynResult, Error};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}}; use std::fs;
// use std::io::BufReader; use tokio::io::{ AsyncWriteExt, AsyncReadExt};
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
use noxis_cli::Cli; use noxis_cli::Cli;
use serde_json::from_str;
/// # Fn `init_cli_pipeline` /// # Fn `init_cli_pipeline`
/// ## for catching all input requests from CLI /// ## for catching all input requests from CLI
@ -21,49 +18,32 @@ use serde_json::from_str;
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn init_cli_pipeline() -> DynResult<()> { pub async fn init_cli_pipeline() -> anyhow::Result<()> {
match init_listener().await { let socket_path = "noxis.sock";
Some(list) => { let _ = fs::remove_file(socket_path);
match UnixListener::bind(socket_path) {
Ok(list) => {
// TODO: remove `unwrap`s
info!("Listening on {}", socket_path);
loop { loop {
if let Ok((socket, addr)) = list.accept().await { match list.accept().await {
// isolation Ok((socket, _)) => {
if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() { // tokio::spawn();
warn!("Declined attempt to connect TCP-socket from {}", addr); process_connection(socket).await;
continue; },
} Err(er) => {
process_connection(socket).await; error!("Cannot poll connection to CLI due to {}", er);
sleep(Duration::from_millis(300)).await;
},
} }
sleep(Duration::from_millis(500)).await;
} }
// Ok(()) // Ok(())
}, },
None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use")) Err(er) => {
} error!("Failed to open UnixListener for CLI");
} Err(er.into())
/// # Fn `init_listener`
/// ## for creating TCP-listener for communicating with CLI
///
/// *input* : -
///
/// *output* : `Some<TcpListener>` if port 7753 was opened | None if not
///
/// *initiator* : fn `init_cli_pipeline`
///
/// *managing* : `TcpListener` object to handle requests
///
/// *depends on* : `tokio::net::TcpListener`
///
async fn init_listener() -> Option<TcpListener> {
match TcpListener::bind("127.0.0.1:7753").await {
Ok(listener) => {
info!("Runner is listening localhost:7753");
Some(listener)
}, },
Err(_) => {
error!("Cannot create TCP listener for CLI");
None
}
} }
} }
@ -80,27 +60,29 @@ async fn init_listener() -> Option<TcpListener> {
/// ///
/// *depends on* : `tokio::net::TcpStream` /// *depends on* : `tokio::net::TcpStream`
/// ///
async fn process_connection(mut stream: TcpStream) { async fn process_connection(mut stream: UnixStream) {
let buf_reader = BufReader::new(stream.borrow_mut()); let mut buf = vec![0; 1024];
let mut rqst = buf_reader.lines(); match stream.read(&mut buf).await {
Ok(0) => {
info!("Client disconnected ");
while let Ok(Some(line)) = rqst.next_line().await { },
if line.is_empty() { Ok(n) => {
break buf.truncate(n);
} info!("CLI have sent {} bytes", n);
match from_str::<Cli>(&line) { match serde_json::from_slice::<Cli>(&buf) {
Ok(req) => { Ok(cli) => {
// TODO: func wrapper info!("Received CLI request: {:?}", cli);
dbg!(req); let response = "OK";
}, if let Err(e) = stream.write_all(response.as_bytes()).await {
Err(_) => { error!("Failed to send response: {}", e);
break }
}, }
} Err(e) => {
println!("{}", line); error!("Failed to parse CLI request: {}", e);
}
}
},
Err(e) => error!("Failed to read from socket: {}", e),
} }
let _ = stream.shutdown().await;
let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!";
stream.write_all(response.as_bytes()).await.unwrap();
} }

View File

@ -9,9 +9,393 @@ use std::sync::Arc;
use std::{env, fs}; use std::{env, fs};
use super::preboot::PrebootParams; use super::preboot::PrebootParams;
use tokio::time::{Duration, sleep}; use tokio::time::{Duration, sleep};
// use redis::PubSub;
use tokio::sync::{
oneshot,
oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender },
broadcast::Sender as BroadcastSender, broadcast::Receiver as BroadcastReceiver };
use crate::utils::files::create_watcher;
use std::fs::File;
use inotify::EventMask;
// const CONFIG_PATH: &str = "settings.json"; // const CONFIG_PATH: &str = "settings.json";
pub mod v2 {
use std::path::PathBuf;
use crate::utils::get_container_id;
use super::*;
pub async fn init_config_mechanism(
// to handle cli config changes
cli_oneshot: OneShotReciever<Processes>,
// to share local config with PRCS, CLI_PIPELINE and CONFIG modules
brd_tx : BroadcastSender<Processes>,
// preboot params (args)
params : Arc<PrebootParams>
/*...*/
) {
// channel for pubsub to handle local config pulling
let local_config_brd_reciever = brd_tx.subscribe();
// channel between pub-sub mech and local config mech
let (tx_pb_lc, rx_pb_lc) = oneshot::channel::<bool>();
// channel between cli mech and local config mech
let (tx_cli_lc, rx_cli_lc) = oneshot::channel::<bool>();
// dbg!("before lc");
let params_clone = params.clone();
let for_lc_path = params.clone();
let lc_path = for_lc_path
.config
.to_str()
.unwrap_or("settings.json");
// future to init work with local config
let lc_future = tokio::spawn(
// let params = params.clone();
local_config_reciever(
params_clone,
rx_pb_lc,
rx_cli_lc,
Arc::new(brd_tx)
)
);
// dbg!("before pb");
// future to init work with pub sub mechanism
let pubsub_future = tokio::spawn(
pubsub_config_reciever(
tx_pb_lc,
params.clone(),
local_config_brd_reciever
)
);
// dbg!("before cli");
// future to catch new configs from cli pipeline
let cli_future = tokio::spawn(
from_cli_config_reciever(
cli_oneshot,
tx_cli_lc
)
);
// let _ = lc_future.await;
// dbg!("before select");
tokio::select! {
lc_result = lc_future => {
// dbg!("end of lc");
match lc_result {
Ok(res) => {
if res.is_ok() {
info!("Local config warding mechanism stopped, waiting for others ...");
sleep(Duration::from_millis(500)).await;
let _ = restart_main_thread();
}
else {
error!("Local config warding mechanism crushed, restarting ...");
let _ = restart_main_thread();
}
},
Err(_) => {
error!("Local config warding mechanism crushed, restarting ...");
let _ = restart_main_thread();
},
}
},
pb_result = pubsub_future => {
match pb_result {
Ok(res) => {
if res.is_ok() {
info!("New config was saved locally, restarting ...");
}
else {
error!("Pubsub mechanism crushed, restarting ...");
}
},
Err(_) => {
error!("Pubsub mechanism crushed, restarting ...");
},
}
let _ = restart_main_thread();
},
cli_config_option = cli_future => {
match cli_config_option {
Err(_) => error!("CLI pulling new config mechanism crushed, restarting ..."),
Ok(option_config) => {
match option_config {
None => error!("CLI pulling new config mechanism crushed, restarting ..."),
Some(config) => {
info!("New config was pulled from CLI, saving and restarting ...");
let _ = save_new_config(&config, lc_path);
},
}
},
}
let _ = restart_main_thread();
},
}
// dbg!("after select");
// TODO! futures + select! [OK]
// TODO! tests config
}
pub async fn get_redis_connection(params: &str) -> Option<Connection> {
for i in 1..=3 {
let redis_url = format!("redis://{}/", params);
info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i);
if let Ok(client) = Client::open(redis_url) {
if let Ok(conn) = client.get_connection() {
info!("Successfully opened Redis connection");
return Some(conn);
}
}
error!("Error with subscribing Redis stream on update. Retrying in 5 secs...");
sleep(Duration::from_secs(5)).await;
}
None
}
// loop checking redis pubsub
async fn pubsub_config_reciever(
// to stop checking local config
local_conf_tx : OneShotSender<bool>,
params : Arc<PrebootParams>,
tx_brd_local : BroadcastReceiver<Processes>,
) -> anyhow::Result<()>{
/*...*/
// dbg!("start of pb");
let mut tx_brd_local = tx_brd_local;
let local_config = if !tx_brd_local.is_empty() {
tx_brd_local.recv().await?
} else {
// Processes::default()
let mut tick = tokio::time::interval(Duration::from_millis(500));
loop {
tick.tick().await;
break match tx_brd_local.recv().await {
Ok(conf) => conf,
Err(_) => continue,
};
}
};
match get_redis_connection(&local_config.config_server).await {
Some(mut conn) => {
let mut pub_sub = conn.as_pubsub();
let channel_name = get_container_id().unwrap_or(String::from("default"));
let channel_name = channel_name.trim();
match pub_sub.subscribe(channel_name) {
Err(er) => {
error!("Cannot subscribe pubsub channel due to {}", &er);
return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
},
Ok(_) => {
info!("Successfully subscribed to {} pubsub channel", channel_name);
let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3)));
loop {
if let Ok(msg) = pub_sub.get_message() {
// dbg!("ok on get message");
let payload : Result<String, _> = msg.get_payload();
match payload {
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
Ok(payload) => {
if let Some(remote) = parse_extern_config(&payload) {
match config_comparing(&local_config, &remote) {
ConfigActuality::Local => {
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
},
ConfigActuality::Remote => {
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
// to stop watching local config file mechanism
let _ = local_conf_tx.send(true);
let config_path = params.config.to_str().unwrap_or("settings.json");
if save_new_config(&remote, &config_path).is_err() {
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
return Err(anyhow::Error::msg(
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
))
}
return Ok(());
},
}
}
else {
warn!("Invalid config was pulled from Redis channel")
}
},
}
}
// delay
tokio::task::yield_now().await;
}
},
}
},
None => {
sleep(Duration::from_secs(20)).await;
}
}
Ok(())
}
//
async fn local_config_reciever(
params : Arc<PrebootParams>,
pubsub_oneshot : OneShotReciever<bool>,
cli_oneshot : OneShotReciever<bool>,
brd_tx : Arc<BroadcastSender<Processes>>,
/*...*/
) -> anyhow::Result<()> {
/*...*/
// shadowing as mut
let mut pubsub_oneshot = pubsub_oneshot;
let mut cli_oneshot = cli_oneshot;
// fill with default empty config, mut to change later
let mut _current_config = Processes::default();
// PathBuf to &str to work with local config path as slice
let local_config_path = params
.config
.to_str()
.unwrap_or("settings.json");
match load_processes(local_config_path) {
// if local exists
Some(conf) => {
info!("Local config `{}` was found.", &conf.date_of_creation);
_current_config = conf;
if let Err(er) = brd_tx.send(_current_config.clone()) {
error!("Cannot share local config with broadcast due to {}", er);
}
},
// if local is not exist
None => {
warn!("Local config wasn't found. Waiting for new ...");
return Err(anyhow::Error::msg("No local config"));
// ...
},
}
// 100% local exists here
// create watcher on local config file
match create_watcher("", local_config_path) {
Ok(mut watcher) => {
loop {
let mut need_to_export_config = false;
// let mut need_to_recreate_watcher = false;
// return situations here
// 1) oneshot signal
// 2) if config was deleted -> recreate and fill with current config that is held here
// 3) if config was changed -> fill with current config that is held here
// catching signal from pubsub
// it's because pubsub mech pulled new valid and actual config and now it's time to ...
// ... overwrite local config file and restart main thread
if let Ok(_) = pubsub_oneshot.try_recv() {
sleep(Duration::from_secs(1)).await;
return Ok(());
}
// catching signal from cli
// it's because cli mech pulled new valid and actual config and now it's time to ...
// ... overwrite local config file and restart main thread (like in previous mechanism)
if let Ok(_) = cli_oneshot.try_recv() {
sleep(Duration::from_secs(1)).await;
return Ok(());
}
// ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ...
// ! ... FROM WATCHER"S BUFFER
// existing check
if !params.config.exists() {
warn!("Local config file was deleted or moved. Recreating new one with saved data ...");
need_to_export_config = true;
// need_to_recreate_watcher = true;
} else {
// changes check
let mut buffer = [0; 128];
let events = watcher.read_events(&mut buffer);
if events.is_ok() {
let events: Vec<EventMask> = events
.unwrap()
.map(|mask| mask.mask)
.filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
})
.collect();
if !events.is_empty() {
warn!("Local config file was overwritten. Discarding changes ...");
need_to_export_config = true;
// events
// .iter()
// .any(|event| *event == EventMask::DELETE_SELF)
// .then(|| need_to_recreate_watcher = true);
}
}
}
// exporting data
if need_to_export_config {
if let Err(er) = export_saved_config_data_locally(&params.config, &_current_config).await {
error!("Cannot save actual imported config due to {}", er);
} else {
// recreation watcher (draining activity buffer mechanism)
// if local config file was deleted and recreated
// if local config file was modified locally
match create_watcher("", local_config_path) {
Ok(new) => watcher = new,
Err(er) => error!("Cannot create new watcher due to {}", er),
}
}
}
sleep(Duration::from_millis(300)).await;
// tokio::task::yield_now().await;
}
},
Err(_) => {
error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path);
return Err(anyhow::Error::msg("Cannot create watcher on local config file"));
},
}
}
// [:IN-TEST]
async fn from_cli_config_reciever(
cli_oneshot: OneShotReciever<Processes>,
to_local_tx: OneShotSender<bool>
) -> Option<Processes> {
/* match awaits til channel*/
// dbg!("start of cli");
loop {
if !cli_oneshot.is_empty() {
match cli_oneshot.await {
Ok(config_from_cli) => {
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);
let _ = to_local_tx.send(true);
return Some(config_from_cli)
},
_ => return None,
}
}
sleep(Duration::from_millis(300)).await;
}
}
async fn export_saved_config_data_locally(
config_file_path: &PathBuf,
current_config: &Processes
) -> anyhow::Result<()> {
let mut file = File::create(config_file_path)?;
file.write_all(
serde_json::to_string_pretty(current_config)?.as_bytes()
)?;
Ok(())
// Ok(())
}
}
/// # Fn `load_processes` /// # Fn `load_processes`
/// ## for reading and parsing *local* storing config /// ## for reading and parsing *local* storing config
/// ///
@ -54,14 +438,14 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
error!("Invalid character in config file. Config path was set to default"); error!("Invalid character in config file. Config path was set to default");
"settings.json" "settings.json"
}); });
info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url); info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url);
match load_processes(config_path) { match load_processes(config_path) {
Some(local_conf) => { Some(local_conf) => {
info!( info!(
"Found local configuration, version - {}", "Found local configuration, version - {}",
&local_conf.date_of_creation &local_conf.date_of_creation
); );
if !params.no_remote_config { if !params.no_sub {
if let Some(remote_conf) = if let Some(remote_conf) =
// TODO : rework with pubsub mech // TODO : rework with pubsub mech
once_get_remote_configuration(&format!("redis://{}/", &params.remote_server_url)) once_get_remote_configuration(&format!("redis://{}/", &params.remote_server_url))
@ -85,7 +469,7 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
} }
None => { None => {
warn!("No local valid conf was found. Trying to pull remote one..."); warn!("No local valid conf was found. Trying to pull remote one...");
if !params.no_remote_config { if !params.no_sub {
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", &params.remote_server_url))); let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", &params.remote_server_url)));
if let Some(conf) = get_remote_conf_watcher(&mut conn).await { if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
@ -322,7 +706,7 @@ fn restart_main_thread() -> std::io::Result<()> {
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> { pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json"); let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
if params.no_sub || params.no_remote_config { if params.no_sub {
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) { if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) {
@ -397,6 +781,9 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<Pr
/// *depends on* : `Processes`, `ConfigActuality` /// *depends on* : `Processes`, `ConfigActuality`
/// ///
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
if local.is_default() {
return ConfigActuality::Remote;
}
let local_date: u64 = local.date_of_creation.parse().unwrap(); let local_date: u64 = local.date_of_creation.parse().unwrap();
let remote_date: u64 = remote.date_of_creation.parse().unwrap(); let remote_date: u64 = remote.date_of_creation.parse().unwrap();

View File

@ -13,7 +13,7 @@ enum EnvVars {
NoxisNoHagent, NoxisNoHagent,
NoxisNoLogs, NoxisNoLogs,
NoxisRefreshLogs, NoxisRefreshLogs,
NoxisNoRemoteConfig, // NoxisNoRemoteConfig,
NoxisNoConfigSub, NoxisNoConfigSub,
NoxisSocketPath, NoxisSocketPath,
NoxisLogTo, NoxisLogTo,
@ -29,7 +29,7 @@ impl std::fmt::Display for EnvVars {
EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"), EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"),
EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"), EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"),
EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"), EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"),
EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"), // EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"),
EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"), EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"),
EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"), EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"),
EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"), EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"),
@ -48,7 +48,7 @@ impl<'a> EnvVars {
EnvVars::NoxisNoHagent => "false", EnvVars::NoxisNoHagent => "false",
EnvVars::NoxisNoLogs => "false", EnvVars::NoxisNoLogs => "false",
EnvVars::NoxisRefreshLogs => "false", EnvVars::NoxisRefreshLogs => "false",
EnvVars::NoxisNoRemoteConfig => "false", // EnvVars::NoxisNoRemoteConfig => "false",
EnvVars::NoxisNoConfigSub => "false", EnvVars::NoxisNoConfigSub => "false",
EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock", EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock",
EnvVars::NoxisLogTo => "./", EnvVars::NoxisLogTo => "./",
@ -77,7 +77,7 @@ impl<'a> EnvVars {
Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string()); Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string());
Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string()); Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string());
Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string()); Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string());
Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string()); // Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string());
Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string()); Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string());
Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap()); Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap());
Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap()); Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap());
@ -147,12 +147,6 @@ impl std::fmt::Display for MetricsPrebootParams {
/// noxis-rs ... --refresh-logs ... /// noxis-rs ... --refresh-logs ...
/// ``` /// ```
/// ///
/// `--no-remote-config` - to disable work with Redis as config producer
/// ### usage :
/// ``` bash
/// noxis-rs ... --no-remote-config ...
/// ```
///
/// `--no-sub` - to disable Redis subscribtion mechanism /// `--no-sub` - to disable Redis subscribtion mechanism
/// ### usage : /// ### usage :
/// ``` bash /// ``` bash
@ -212,17 +206,18 @@ pub struct PrebootParams {
help="To clear logs directory" help="To clear logs directory"
)] )]
pub refresh_logs : bool, pub refresh_logs : bool,
#[arg( // #[arg(
long = "no-remote-config", // long = "no-remote-config",
action, // action,
help="To disable work with remote config server", // help="To disable work with remote config server",
conflicts_with="no_sub")] // conflicts_with="no_sub")]
pub no_remote_config : bool, // pub no_remote_config : bool,
#[arg( #[arg(
long = "no-sub", long = "no-sub",
action, action,
help="To disable subscription mechanism", help="To disable Redis subscription mechanism",
conflicts_with="no_remote_config")] )]
// conflicts_with="no_remote_config"
pub no_sub : bool, pub no_sub : bool,
// params (socket_path, log_to, remote_server_url, config) // params (socket_path, log_to, remote_server_url, config)
@ -243,7 +238,7 @@ pub struct PrebootParams {
#[arg( #[arg(
long = "remote-server-url", long = "remote-server-url",
default_value="localhost", default_value="localhost",
conflicts_with="no_remote_config", conflicts_with="no_sub",
help = "To set url of remote config server using in remote config pulling mechanism" help = "To set url of remote config server using in remote config pulling mechanism"
)] )]
pub remote_server_url : String, pub remote_server_url : String,
@ -288,15 +283,17 @@ impl PrebootParams {
// existing log dir // existing log dir
if !self.log_to.exists() && !self.no_logs { if !self.log_to.exists() && !self.no_logs {
eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default"); eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default");
self.refresh_logs = false;
self.log_to = PathBuf::from("./"); self.log_to = PathBuf::from("./");
// return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start")); // return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start"));
} }
// existing sock file // existing sock file
if !self.config.exists() { if !self.config.exists() {
eprintln!("Error: Invalid character in config file. Config path was set to default"); eprintln!("Error: Invalid character in config file. Config path was set to default");
let config = PathBuf::from("/etc/settings.json"); // TODO : ??? wtf is going with 2 paths
if !config.exists() && self.no_remote_config { let config = PathBuf::from("/etc/enode/noxis/settings.json");
return Err(Error::msg("Noxis cannot run without config. Create local config or enable remote-config mechanism")); if !config.exists() && self.no_sub {
return Err(Error::msg("Noxis cannot run without config. Create local config or enable pubsub mechanism"));
} }
self.config = PathBuf::from("settings.json"); self.config = PathBuf::from("settings.json");
// return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start")); // return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start"));
@ -353,20 +350,20 @@ mod preboot_unitests{
"runner-rs", "runner-rs",
"--no-sub", "--no-sub",
"--remote-server-url", "redis://127.0.0.1" "--remote-server-url", "redis://127.0.0.1"
]).is_ok())
}
#[test]
fn parsing_config_invalid_args_noremote_nosub() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--no-remote-config", "--no-sub"
]).is_err()) ]).is_err())
} }
// #[test]
// fn parsing_config_invalid_args_noremote_nosub() {
// assert!(PrebootParams::try_parse_from(vec![
// "runner-rs",
// "--no-remote-config", "--no-sub"
// ]).is_err())
// }
#[test] #[test]
fn parsing_config_invalid_args_noremote_remoteurl() { fn parsing_config_invalid_args_noremote_remoteurl() {
assert!(PrebootParams::try_parse_from(vec![ assert!(PrebootParams::try_parse_from(vec![
"runner-rs", "runner-rs",
"--no-remote-config", "--no-sub",
"--remote-server-url", "redis://127.0.0.1" "--remote-server-url", "redis://127.0.0.1"
]).is_err()) ]).is_err())
} }

View File

@ -22,7 +22,7 @@ type SendersVec = Arc<Vec<Arc<mpsc::Sender<u8>>>>;
/// ///
/// *depends on* : Sig, Signals /// *depends on* : Sig, Signals
/// ///
pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError> { pub async fn set_valid_destructor(senders: SendersVec) -> anyhow::Result<()> {
let (mut int, mut term, mut stop) = ( let (mut int, mut term, mut stop) = (
Sig::new(Signals::Sigint, senders.clone()), Sig::new(Signals::Sigint, senders.clone()),
Sig::new(Signals::Sigterm, senders.clone()), Sig::new(Signals::Sigterm, senders.clone()),

View File

@ -2,7 +2,114 @@
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use std::sync::Arc;
#[derive(Debug)]
pub enum DependencyType {
File,
Service,
}
#[derive(Debug)]
pub enum ServiceState {
Ok,
Unavailable
}
pub struct ServiceWaitConfig(u32);
impl Default for ServiceWaitConfig {
fn default() -> Self {
Self(5)
}
}
pub enum FileTriggerType {
OnChange,
OnDelete,
}
impl std::fmt::Display for FileTriggerType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self {
FileTriggerType::OnChange => write!(f, "File was changed"),
FileTriggerType::OnDelete => write!(f, "File was moved or deleted"),
}
}
}
impl<'a> FileTriggerType {
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger)),
}
}
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger.on_delete.clone())),
}
}
}
#[derive(Debug)]
pub enum Triggers {
File { on_change: Arc<str>, on_delete: Arc<str> },
Service {on_lost: Arc<str>, wait: u32},
}
impl Triggers {
pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers {
Triggers::File { on_change, on_delete }
}
pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers {
Triggers::Service{on_lost, wait: wait_time}
}
pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> {
if let Triggers::Service { on_lost, .. } = self {
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone())))
}
None
}
}
#[derive(Debug)]
pub struct FileTriggersForController{ pub on_change: Arc<str>, pub on_delete: Arc<str> }
pub struct ServiceTriggersForController(Arc<str>);
impl std::fmt::Display for DependencyType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self {
DependencyType::File => write!(f, "File"),
DependencyType::Service => write!(f, "Service"),
}
}
}
#[derive(Debug)]
pub enum ProcessState {
Pending,
Holding,
Stopped,
StoppedByCli,
}
#[derive(Debug)]
pub enum Events {
Positive(Arc<str>),
Negative(NegativeOutcomes)
}
#[derive(Debug)]
pub enum NegativeOutcomes {
FileWasChanged(Arc<str>, DependencyType, Arc<str>),
FileWasMovedOrDeleted(Arc<str>, DependencyType, Arc<str>),
ServiceIsUnreachable(Arc<str>, DependencyType, Arc<str>),
}
#[async_trait]
pub trait ProcessUnit {
async fn process(&mut self);
}
/// # an Error enum (next will be deleted and replaced) /// # an Error enum (next will be deleted and replaced)
pub enum CustomError { pub enum CustomError {
Fatal, Fatal,
@ -40,6 +147,22 @@ pub struct Processes {
pub processes: Vec<TrackingProcess>, pub processes: Vec<TrackingProcess>,
} }
impl Default for Processes {
fn default() -> Self {
Self {
date_of_creation : String::new(),
config_server : String::from("default"),
processes : Vec::new(),
}
}
}
impl Processes {
pub fn is_default(&self) -> bool {
self.date_of_creation.is_empty()
}
}
/// # Struct for the 2nd level in json conf file /// # Struct for the 2nd level in json conf file
/// ## for each process to contain info, such as name, path and dependencies /// ## for each process to contain info, such as name, path and dependencies
/// ///
@ -135,7 +258,7 @@ pub struct Files {
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Services { pub struct Services {
pub hostname: String, pub hostname: String,
pub port: u32, pub port: Option<u32>,
pub triggers: ServiceTriggers, pub triggers: ServiceTriggers,
} }
@ -159,7 +282,6 @@ pub struct Services {
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ServiceTriggers { pub struct ServiceTriggers {
pub wait: u32, pub wait: u32,
pub delay: u32,
#[serde(rename = "onLost")] #[serde(rename = "onLost")]
pub on_lost: String, pub on_lost: String,
} }

View File

@ -6,25 +6,207 @@ pub mod services;
// TODO : saving current flags state // TODO : saving current flags state
use crate::options::structs::CustomError; use crate::options::structs::{CustomError, TrackingProcess, Processes};
use crate::options::structs::TrackingProcess; // use files::create_watcher;
use files::create_watcher; // use files::file_handler;
use files::file_handler; // use inotify::Inotify;
use inotify::Inotify; use log::{error, warn, info};
use log::{error, warn};
use prcs::{ use prcs::{
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process, freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
unfreeze_process, unfreeze_process,
}; };
use services::service_handler; // use services::service_handler;
use std::process::Command; use std::process::Command;
use std::sync::Arc; use std::sync::Arc;
use tokio::join; // use tokio::join;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::Duration; use tokio::time::Duration;
// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender};
// controllers import
use prcs::v2::ProcessesController;
use files::v2::FilesController;
use services::v2::ServicesController;
use async_trait::async_trait;
const GET_ID_CMD: &str = "hostname"; const GET_ID_CMD: &str = "hostname";
pub mod v2 {
use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
use super::*;
#[derive(Debug)]
enum ControllerResult {
Process(Option<ProcessesController>),
File(Option<FilesController>),
Service(Option<ServicesController>),
}
#[derive(Debug)]
struct Supervisor {
prcs : LinkedList<ProcessesController>,
files : LinkedList<FilesController>,
services : LinkedList<ServicesController>,
}
impl Supervisor {
pub fn new() -> Supervisor {
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
}
pub async fn with_config(mut self, config: &Processes) -> Supervisor {
let _ = config.processes.iter()
.for_each(|prc| {
let (rx, tx) = mpsc::channel::<Events>(10);
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
if !self.prcs.contains(&temp) {
self.prcs.push_back(temp);
}
let rx = Arc::new(rx);
let proc_name: Arc<str> = Arc::from(prc.name.clone());
let _ = prc.dependencies.files.iter()
.for_each(|file| {
let mut hm = HashMap::new();
let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())};
hm.insert(proc_name.clone(), (triggers, rx.clone()));
let tempfile = FilesController::new(&file.filename.as_str(), hm)
.with_path(&file.src);
if let Ok(file) = tempfile {
if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) {
current_file.add_event(file);
} else {
self.files.push_back(file);
}
}
});
// servs
let _ = prc.dependencies.services.iter()
.for_each(|serv| {
let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref());
// preparations
let rx = rx.clone();
let serv_cont = ServicesController::new().with_access_name(
&serv.hostname,
&access_url
);
// triggers
let arc: Arc<str> = Arc::from(serv.triggers.on_lost.clone());
let triggers = Triggers::new_service(arc, serv.triggers.wait);
if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) {
proc.add_process(&prc.name, triggers, rx);
} else {
// vecdeque for queue
let mut vec: VecDeque<Arc<str>> = VecDeque::new();
vec.push_back(proc_name.clone());
// connection_queue
let mut connection_queue: BTreeMap<u32, VecDeque<Arc<str>>> = BTreeMap::new();
connection_queue.insert(serv.triggers.wait, vec);
// event_reg
let mut hm = HashMap::new();
hm.insert(proc_name.clone(), (triggers, rx));
let serv_cont = serv_cont.with_params(connection_queue, hm);
self.services.push_back(serv_cont);
}
});
});
self
}
pub fn get_stats(&self) -> String {
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len())
}
}
#[async_trait]
impl ProcessUnit for Supervisor {
async fn process(&mut self) {
info!("Initializing monitoring ...");
loop {
// dbg!(&self);
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
// let res = tokio::join!(prc.process(), file.process(), serv.process());
if let Some(mut val) = self.prcs.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::Process(Some(val))
})
);
}
if let Some(mut val) = self.files.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::File(Some(val))
})
);
}
if let Some(mut val) = self.services.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::Service(Some(val))
})
);
}
for task in tasks {
match task.await {
Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val),
Ok(ControllerResult::File(Some(val))) => self.files.push_back(val),
Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val),
Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."),
_ => { /* DEAD END (CAN NOT BE EXECUTED) */},
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
// spawn tasks
// spawn prc
// spawn files
// spawn services
// ## for ... i.await in loop
pub async fn init_monitoring(
config: Processes
) -> anyhow::Result<()> {
let mut supervisor = Supervisor::new().with_config(&config).await;
info!("Monitoring: {} ", &supervisor.get_stats());
supervisor.process().await;
Ok(())
}
// async fn generate_controllers<'a>(config: Processes) -> (HashSet<ProcessesController<'a>>, HashSet<FilesController<'a>>, HashSet<ServicesController<'a>>) {
// let mut prcs: HashSet<ProcessesController<'a>> = HashSet::new();
// let mut files: HashSet<FilesController<'a>> = HashSet::new();
// let mut services: HashSet<ServicesController<'a>> = HashSet::new();
// for prc in config.processes {
// let (rx, tx) = mpsc::channel::<Events<'a>>(10);
// // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path);
// let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path);
// let a = new_prc.process().await;
// }
// (prcs, files, services)
// }
// spawn prc check with semaphore check
async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) }
// spawn file check with semaphore check
async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) }
// spawn service check with semaphore check
async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) }
}
/// # Fn `run_daemons` /// # Fn `run_daemons`
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel /// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
/// ///
@ -40,37 +222,37 @@ const GET_ID_CMD: &str = "hostname";
/// ///
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process /// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
/// ///
pub async fn run_daemons( // pub async fn run_daemons(
proc: Arc<TrackingProcess>, // proc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>, // tx: Arc<mpsc::Sender<u8>>,
rx: &mut mpsc::Receiver<u8>, // rx: &mut mpsc::Receiver<u8>,
) { // ) {
// creating watchers + ---buffers--- // // creating watchers + ---buffers---
let mut watchers: Vec<Inotify> = vec![]; // let mut watchers: Vec<Inotify> = vec![];
for file in proc.dependencies.files.clone().into_iter() { // for file in proc.dependencies.files.clone().into_iter() {
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { // if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
watchers.push(watcher); // watchers.push(watcher);
} else { // } else {
let _ = tx.send(121).await; // let _ = tx.send(121).await;
} // }
// watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); // // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
} // }
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> = // let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
Arc::new(tokio::sync::Mutex::new(watchers)); // Arc::new(tokio::sync::Mutex::new(watchers));
loop { // loop {
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); // let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
tokio::select! { // tokio::select! {
_ = run_hand => continue, // _ = run_hand => continue,
_val = rx.recv() => { // _val = rx.recv() => {
if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { // if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
return; // return;
} // }
}, // },
} // }
tokio::task::yield_now().await; // tokio::task::yield_now().await;
} // }
} // }
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
match val { match val {
@ -133,7 +315,6 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
}, },
// // 9 - File-dependency change -> staying (after check) // // 9 - File-dependency change -> staying (after check)
9 => { 9 => {
// no need to trash logs
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
}, },
@ -201,36 +382,36 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
/// ///
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` /// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
/// ///
pub async fn running_handler( // pub async fn running_handler(
prc: Arc<TrackingProcess>, // prc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>, // tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>, // watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) { // ) {
// services and files check (once) // // services and files check (once)
let files_check = file_handler( // let files_check = file_handler(
&prc.name, // &prc.name,
&prc.dependencies.files, // &prc.dependencies.files,
tx.clone(), // tx.clone(),
watchers.clone(), // watchers.clone(),
); // );
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); // let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
let res = join!(files_check, services_check); // let res = join!(files_check, services_check);
// if inactive -> spawn checks -> active is true // // if inactive -> spawn checks -> active is true
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { // if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
if start_process(&prc.name, &prc.path).await.is_err() { // if start_process(&prc.name, &prc.path).await.is_err() {
tx.send(3).await.unwrap(); // tx.send(3).await.unwrap();
return; // return;
} // }
} // }
// if frozen -> spawn checks -> unfreeze is true // // if frozen -> spawn checks -> unfreeze is true
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { // else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
tx.send(10).await.unwrap(); // tx.send(10).await.unwrap();
return; // return;
} // }
// tokio::time::sleep(Duration::from_millis(100)).await; // // tokio::time::sleep(Duration::from_millis(100)).await;
tokio::task::yield_now().await; // tokio::task::yield_now().await;
} // }
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
/// # Fn `get_container_id` /// # Fn `get_container_id`

View File

@ -1,183 +1,321 @@
use crate::options::structs::{CustomError, Files}; use crate::options::structs::{CustomError, Files};
use super::prcs::{is_active, is_frozen}; use super::prcs::{is_active, is_frozen};
use inotify::{EventMask, Inotify, WatchMask}; use inotify::{EventMask, Inotify, WatchMask};
use std::borrow::BorrowMut; use std::borrow::BorrowMut;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::Duration; use tokio::sync::mpsc::Sender as Sender;
use tokio::time::Duration;
use crate::options::structs::Events;
use async_trait::async_trait;
/// # Fn `create_watcher` pub mod v2 {
/// ## for creating watcher on file's delete | update events use log::{error, info, warn};
/// use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
/// *input* : `&str`, `&str` use super::*;
/// use std::{collections::HashMap, path::Path};
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
///
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
///
/// *managing* : current file's name: &str, path in local storage to current file: &str
///
/// *depends on* : -
///
pub async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {
let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
Ok(inotify)
}
/// # Fn `create_watcher` type MpscSender = Arc<Sender<Events>>;
/// ## for managing processes by checking dep files' states type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
///
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>` #[derive(Debug)]
/// enum FileState {
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check Ok,
/// NotFound,
/// *initiator* : fn `utils::running_handler` }
///
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>` #[derive(Debug)]
/// pub struct FilesController {
/// *depends on* : Files name : Arc<str>,
/// path : String,
pub async fn file_handler( code_name : Arc<str>,
name: &str, state : FileState,
files: &[Files], watcher : Option<Inotify>,
tx: Arc<mpsc::Sender<u8>>, triggers : EventHandlers,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>, }
) -> Result<(), CustomError> {
for (i, file) in files.iter().enumerate() { impl PartialEq for FilesController {
// let src = format!("{}{}", file.src, file.filename); fn eq(&self, other: &Self) -> bool {
if check_file(&file.filename, &file.src).await.is_err() { self.code_name == other.code_name
if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal);
} }
match file.triggers.on_delete.as_str() { }
"stay" => {
tx.send(9).await.unwrap(); impl FilesController {
continue; pub fn new(name: &str, triggers: EventHandlers) -> FilesController {
} let name: Arc<str> = Arc::from(name);
"stop" => { Self {
if is_active(name).await { name : name.clone(),
tx.send(1).await.unwrap(); path : String::new(),
} state : FileState::Ok,
return Err(CustomError::Fatal); watcher : None,
} triggers,
"hold" => { code_name : name.clone(),
if is_active(name).await {
tx.send(2).await.unwrap();
return Err(CustomError::Fatal);
}
}
_ => {
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap();
return Err(CustomError::Fatal);
} }
} }
} else if is_active(name).await && !is_frozen(name).await { pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController> {
let watchers = watchers.clone(); self.path = path.as_ref().to_string_lossy().into_owned();
// println!("mutex: {:?}", watchers); self.watcher = {
let mut buffer = [0; 128]; match create_watcher(&self.name, &self.path) {
let mut mutex_guard = watchers.lock().await; Ok(val) => Some(val),
if let Some(notify) = mutex_guard.get_mut(i) { Err(er) => {
let events = notify.read_events(&mut buffer); error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
// println!("{:?}", events); return Err(er)
if events.is_ok() {
let events: Vec<EventMask> = events
.unwrap()
.map(|mask| mask.mask)
.filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
})
.collect();
for event in events {
if let EventMask::DELETE_SELF = event {
// ! warning (DELETE_SELF event) !
// println!("! warning (DELETE_SELF event) !");
// * watcher recreation after dealing with file recreation mechanism in text editors
let mutex = notify.borrow_mut();
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
*mutex = watcher;
}
} }
match file.triggers.on_change.as_str() { }
"stop" => { };
let _ = tx.send(7).await; self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name));
Ok(self)
}
pub fn add_event(&mut self, file_controller : FilesController) {
for (k, v) in file_controller.triggers {
self.triggers.entry(k).or_insert(v);
}
}
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
for (prc_name, (triggers, channel)) in &self.triggers {
let msg = match &trigger_type {
None => {
Events::Positive(self.code_name.clone())
},
Some(event) => {
info!("Event on file {} ({}) : {}. Notifying `{}` ...", &self.name, &self.path, event, &prc_name);
event.event_from_file_trigger_controller(self.code_name.clone(), &triggers)
},
};
let _ = channel.send(msg).await;
}
}
}
#[async_trait]
impl ProcessUnit for FilesController {
async fn process(&mut self) {
// polling file check
// 1) existing check
// dbg!(&self);
if let Ok(_) = check_file(&self.name, &self.path).await {
if let FileState::NotFound = self.state {
info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name);
self.state = FileState::Ok;
// reseting negative outcome in prc
self.trigger_on(None).await;
}
match &mut self.watcher {
Some(notify) => {
let mut buffer = [0; 1024];
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
// notif_events.into_iter().for_each(|mask| {dbg!(&mask.mask);});
// todo!();
if let (recreate_watcher, true) = (
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
) {
warn!("File {} ({}) was changed", self.name, &self.path);
if recreate_watcher {
self.watcher = match create_watcher(&self.name, &self.path) {
Ok(notifier) => Some(notifier),
Err(er) => {
error!("Failed to recreate watcher for {} ({}) due to {}",
self.name,
&self.path,
er
);
None
},
}
}
self.trigger_on(Some(FileTriggerType::OnChange)).await;
return;
}
} }
"restart" => { },
let _ = tx.send(8).await; None => { /* DEAD END */},
}
} else {
if let FileState::Ok = self.state {
warn!("File {} ({}) was not found in determined scope", self.name, &self.path);
self.state = FileState::NotFound;
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
}
return;
}
self.trigger_on(None).await;
// 2) change check
}
}
}
/// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events
///
/// *input* : `&str`, `&str`
///
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
///
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
///
/// *managing* : current file's name: &str, path in local storage to current file: &str
///
/// *depends on* : -
///
pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
Ok(inotify)
}
/// # Fn `create_watcher`
/// ## for managing processes by checking dep files' states
///
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
///
/// *initiator* : fn `utils::running_handler`
///
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *depends on* : Files
///
pub async fn file_handler(
name: &str,
files: &[Files],
tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) -> anyhow::Result<()> {
for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() {
if !is_active(name).await || is_frozen(name).await {
return Err(anyhow::Error::msg("Process is frozen or stopped"));
}
match file.triggers.on_delete.as_str() {
"stay" => {
tx.send(9).await.unwrap();
continue;
}
"stop" => {
if is_active(name).await {
tx.send(1).await.unwrap();
}
return Err(anyhow::Error::msg("Process was stopped"));
}
"hold" => {
if is_active(name).await {
tx.send(2).await.unwrap();
return Err(anyhow::Error::msg("Process was frozen"));
}
}
_ => {
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap();
return Err(anyhow::Error::msg("Impermissible character or word in file trigger"));
}
}
} else if is_active(name).await && !is_frozen(name).await {
let watchers = watchers.clone();
// println!("mutex: {:?}", watchers);
let mut buffer = [0; 128];
let mut mutex_guard = watchers.lock().await;
if let Some(notify) = mutex_guard.get_mut(i) {
let events = notify.read_events(&mut buffer);
// println!("{:?}", events);
if events.is_ok() {
let events: Vec<EventMask> = events
.unwrap()
.map(|mask| mask.mask)
.filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
})
.collect();
for event in events {
if let EventMask::DELETE_SELF = event {
// ! warning (DELETE_SELF event) !
// println!("! warning (DELETE_SELF event) !");
// * watcher recreation after dealing with file recreation mechanism in text editors
let mutex = notify.borrow_mut();
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
if let Ok(watcher) = create_watcher(&file.filename, &file.src) {
*mutex = watcher;
}
} }
"stay" => { match file.triggers.on_change.as_str() {
let _ = tx.send(9).await; "stop" => {
} let _ = tx.send(7).await;
_ => { }
let _ = tx.send(101).await; "restart" => {
let _ = tx.send(8).await;
}
"stay" => {
let _ = tx.send(9).await;
}
_ => {
let _ = tx.send(101).await;
}
} }
} }
} }
} }
} }
} }
tokio::task::yield_now().await;
Ok(())
} }
tokio::task::yield_now().await;
Ok(())
}
/// # Fn `check_file` /// # Fn `check_file`
/// ## for checking existance of current file /// ## for checking existance of current file
/// ///
/// *input* : `&str`, `&str` /// *input* : `&str`, `&str`
/// ///
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error /// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
/// ///
/// *initiator* : fn `file_handler` /// *initiator* : fn `file_handler`
/// ///
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str` /// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
/// ///
/// *depends on* : network activity /// *depends on* : network activity
/// ///
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string()); let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string()); let arc_path = Arc::new(path.to_string());
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let file_concat = format!("{}{}", arc_path, arc_name); let file_concat = format!("{}{}", arc_path, arc_name);
let path = Path::new(&file_concat); let path = Path::new(&file_concat);
if path.exists() { if path.exists() {
Ok(()) Ok(())
} else { } else {
Err(CustomError::Fatal) Err(CustomError::Fatal)
}
})
.await
.unwrap_or_else(|_| {
panic!("Corrupted while file check process");
})
}
#[cfg(test)]
mod files_unittests {
use super::*;
#[tokio::test]
async fn try_to_create_watcher() {
let res = create_watcher("dep-file", "./tests/examples/");
assert!(res.is_ok());
}
#[tokio::test]
async fn try_to_create_invalid_watcher() {
let res = create_watcher("invalid-file", "/path/to/the/no/dir");
assert!(res.is_err());
}
#[tokio::test]
async fn check_existing_file() {
let res = check_file("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
}
#[tokio::test]
async fn check_non_existing_file() {
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
} }
})
.await
.unwrap_or_else(|_| {
panic!("Corrupted while file check process");
})
}
#[cfg(test)]
mod files_unittests {
use super::*;
#[tokio::test]
async fn try_to_create_watcher() {
let res = create_watcher("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
} }
#[tokio::test]
async fn try_to_create_invalid_watcher() {
let res = create_watcher("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
}
#[tokio::test]
async fn check_existing_file() {
let res = check_file("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
}
#[tokio::test]
async fn check_non_existing_file() {
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
}
}

View File

@ -1,8 +1,132 @@
use crate::options::structs::CustomError;
use log::{error, warn}; use log::{error, warn};
use std::process::{Command, Output}; use std::process::{Command, Output};
use std::sync::Arc; use std::sync::Arc;
use tokio::time::Duration; use tokio::time::Duration;
use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit};
use std::collections::HashSet;
use tokio::sync::mpsc::Receiver as MpscReciever;
use async_trait::async_trait;
pub mod v2 {
use log::info;
use crate::options::structs::DependencyType;
use std::path::Path;
use super::*;
#[derive(Debug)]
pub struct ProcessesController {
name: Arc<str>,
bin: String,
// obj: Arc<TrackingProcess>,
state: ProcessState,
event_reader: MpscReciever<Events>,
negative_events: HashSet<Arc<str>>,
}
impl PartialEq for ProcessesController {
fn eq(&self, other: &Self) -> bool {
self.bin == other.bin
}
}
impl ProcessesController {
pub fn new(name: &str, event_reader: MpscReciever<Events>) -> ProcessesController {
ProcessesController {
name : Arc::from(name),
bin : String::new(),
state : ProcessState::Stopped,
event_reader,
negative_events : HashSet::new(),
}
}
pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController {
self.bin = bin.as_ref().to_string_lossy().into_owned();
self
}
async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) {
match trigger {
"stay" => {
info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name);
},
"stop" => {
if is_active(&self.name).await {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
terminate_process(&self.name).await;
self.state = ProcessState::Stopped;
}
},
"hold" => {
if !is_frozen(&self.name).await {
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
freeze_process(&self.name).await;
self.state = ProcessState::Holding;
}
},
"restart" => {
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
let _ = restart_process(&self.name, &self.bin).await;
},
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
}
tokio::time::sleep(Duration::from_micros(100)).await;
}
}
#[async_trait]
impl ProcessUnit for ProcessesController {
async fn process(&mut self) {
if self.negative_events.len() == 0 {
match self.state {
ProcessState::Holding => {
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
if let Err(er) = unfreeze_process(&self.name).await {
error!("Cannot unfreeze process {} : {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
},
ProcessState::Stopped => {
info!("No negative dependecies events on {} process. Starting ...", self.name);
if let Err(er) = start_process(&self.name, &self.bin).await {
error!("Cannot start process {} : {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
},
_ => {},
}
}
while let Ok(event) = self.event_reader.try_recv() {
match event {
Events::Positive(target) => {
if self.negative_events.contains(&target) {
self.negative_events.remove(&target);
}
},
Events::Negative(event) => {
match event {
NegativeOutcomes::FileWasChanged(target, dep_type, trigger) |
NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) |
NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
if !self.negative_events.contains(&target) {
self.negative_events.insert(target.clone());
self.trigger_on(
&target,
&trigger,
dep_type
).await;
}
},
}
},
}
}
}
}
}
/// # Fn `get_pid` /// # Fn `get_pid`
/// ## for initializing process of unstoppable grubbing metrics. /// ## for initializing process of unstoppable grubbing metrics.
@ -162,14 +286,11 @@ pub async fn freeze_process(name: &str) {
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn unfreeze_process(name: &str) { pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> {
let _ = Command::new("pkill") let _ = Command::new("pkill")
.args(["-CONT", name]) .args(["-CONT", name])
.output() .output()?;
.unwrap_or_else(|_| { Ok(())
error!("Failed to unfreeze process");
std::process::exit(101);
});
} }
/// # Fn `restart_process` /// # Fn `restart_process`
@ -185,7 +306,7 @@ pub async fn unfreeze_process(name: &str) {
/// ///
/// *depends on* : fn `start_process`, fn `terminate_process` /// *depends on* : fn `start_process`, fn `terminate_process`
/// ///
pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> { pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> {
terminate_process(name).await; terminate_process(name).await;
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
start_process(name, path).await start_process(name, path).await
@ -204,7 +325,7 @@ pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError>
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> {
// let runsh = format!("{} {}", "exec", path); // let runsh = format!("{} {}", "exec", path);
let mut command = Command::new(path); let mut command = Command::new(path);
// command.arg(path); // command.arg(path);
@ -215,8 +336,7 @@ pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
Ok(()) Ok(())
} }
Err(er) => { Err(er) => {
println!("{:?}", er); Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er)))
Err(CustomError::Fatal)
} }
} }
} }

View File

@ -1,10 +1,193 @@
use crate::options::structs::{CustomError, Services}; use crate::options::structs::CustomError;
use super::prcs::{is_active, is_frozen};
use log::{error, warn}; use log::{error, warn};
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::time::Duration;
use tokio::time::{Duration, Instant}; use tokio::sync::mpsc::Sender as Sender;
use async_trait::async_trait;
pub mod v2 {
use log::info;
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
use super::*;
use std::collections::{HashMap, BTreeMap, VecDeque};
type MpscSender = Arc<Sender<Events>>;
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
// type wrapper for service wait queue
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
#[derive(Debug)]
pub struct ServicesController {
// i.e. yandex.ru
#[allow(unused)]
name : String,
// i.e. yandex.ru:443
access_url : Arc<str>,
// "OK" or "Unavailable"
state: ServiceState,
// btree map with key as max wait time and it's key to hashmap
config: ConnectionQueue,
// Map of processes with their (trigger and mpsc sender)
event_registrator : EventHandlers,
}
impl PartialEq for ServicesController {
fn eq(&self, other: &Self) -> bool {
self.access_url == other.access_url
}
}
impl ServicesController {
pub fn new() -> ServicesController {
ServicesController {
name : String::new(),
access_url : Arc::from(String::new()),
state : ServiceState::Unavailable,
config: ConnectionQueue::new(),
event_registrator : EventHandlers::new(),
}
}
pub fn with_access_name(
mut self,
hostname: &str,
access_url: &str,
) -> ServicesController {
self.name = hostname.to_string();
self.access_url = Arc::from(access_url);
self
}
pub fn with_params(
mut self,
conn_queue: ConnectionQueue,
event_reg: EventHandlers,
) -> ServicesController {
self.config = conn_queue;
self.event_registrator = event_reg;
self
}
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
}
pub fn add_process(
&mut self,
proc_name: &str,
trigger: Triggers,
sender: MpscSender,
) {
let proc_name: Arc<str> = Arc::from(proc_name);
// queue add
if let Triggers::Service { wait, .. } = trigger {
self.config.entry(wait)
.and_modify(|el| el.push_back(proc_name.clone()))
.or_insert({
let mut temp = VecDeque::new();
temp.push_back(proc_name.clone());
temp
});
}
// event add
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
}
async fn check_state(&self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?;
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
}
Ok(())
}
async fn trigger_on(&mut self) {
match self.state {
ServiceState::Ok => {
let _ = self.event_registrator
.iter()
.map(|(_, (_, el))| async {
let _ = el.send(Events::Positive(self.access_url.clone())).await;
});
},
ServiceState::Unavailable => {
// looped check and notifying
self.looped_check().await;
},
}
}
async fn looped_check(self: &mut Self) {
let longest = self.config.last_entry().unwrap();
let longest = longest.key();
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
let timer = tokio::time::Instant::now();
let mut attempt: u32 = 1;
let access_url = Arc::new(self.access_url.clone());
// let event_registrator = &mut self.event_registrator;
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
// let access_url = access_url.clone();
loop {
interapter.tick().await;
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt);
attempt += 1;
let state_check_result = self.check_state().await;
if state_check_result.is_ok() {
info!("Connection to {} is `OK` now", &access_url);
self.state = ServiceState::Ok;
break;
} else {
let now = timer.elapsed();
let iterator = self.config.iter()
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
.flat_map(|(_, a)| a.iter().cloned())
.collect::<VecDeque<Arc<str>>>();
for name in iterator {
let proc_name = name.to_string();
info!("Trying to notify process `{}` ...", &proc_name);
let sender_opt = self.event_registrator.get(&name)
.map(|(trigger, sender)|
(trigger.to_service_negative_event(name.clone()), sender)
);
if let Some((tr, tx)) = sender_opt {
let _ = tx.send(tr.unwrap()).await;
} else {
error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url)
}
}
}
}
}).await {
info!("Timeout of establishing connection to {}. ", &access_url);
}
}
}
#[async_trait]
impl ProcessUnit for ServicesController {
async fn process(&mut self) {
// check_service(hostname, port)
let current_state = self.check_state().await;
match (&self.state, current_state) {
(ServiceState::Unavailable, Ok(_)) => {
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
self.state = ServiceState::Ok;
self.trigger_on().await;
},
(ServiceState::Ok, Err(_)) => {
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
self.state = ServiceState::Unavailable;
self.trigger_on().await;
},
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url),
_ => { /* DEAD END WITH NO INTEREST */ },
}
}
}
}
/// # Fn `service_handler` /// # Fn `service_handler`
/// ## function to realize mechanism of current process' dep services monitoring /// ## function to realize mechanism of current process' dep services monitoring
@ -19,53 +202,53 @@ use tokio::time::{Duration, Instant};
/// ///
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` /// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
/// ///
pub async fn service_handler( // pub async fn service_handler(
name: &str, // name: &str,
services: &Vec<Services>, // services: &Vec<Services>,
tx: Arc<mpsc::Sender<u8>>, // tx: Arc<mpsc::Sender<u8>>,
) -> Result<(), CustomError> { // ) -> Result<(), CustomError> {
// println!("service daemon on {}", name); // // println!("service daemon on {}", name);
for serv in services { // for serv in services {
if check_service(&serv.hostname, &serv.port).await.is_err() { // if check_service(&serv.hostname, &serv.port).await.is_err() {
if !is_active(name).await || is_frozen(name).await { // if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal); // return Err(CustomError::Fatal);
} // }
error!( // error!(
"Service {}:{} is unreachable for process {}", // "Service {}:{} is unreachable for process {}",
&serv.hostname, &serv.port, &name // &serv.hostname, &serv.port, &name
); // );
match serv.triggers.on_lost.as_str() { // match serv.triggers.on_lost.as_str() {
"stay" => { // "stay" => {
tx.send(4).await.unwrap(); // tx.send(4).await.unwrap();
continue; // continue;
} // }
"stop" => { // "stop" => {
if looped_service_connecting(name, serv).await.is_err() { // if looped_service_connecting(name, serv).await.is_err() {
tx.send(5).await.unwrap(); // tx.send(5).await.unwrap();
tokio::task::yield_now().await; // tokio::task::yield_now().await;
return Err(CustomError::Fatal); // return Err(CustomError::Fatal);
} // }
} // }
"hold" => { // "hold" => {
// if is_frozen(name).await { // // if is_frozen(name).await {
// return Err(CustomError::Fatal); // // return Err(CustomError::Fatal);
// } // // }
if looped_service_connecting(name, serv).await.is_err() { // if looped_service_connecting(name, serv).await.is_err() {
tx.send(6).await.unwrap(); // tx.send(6).await.unwrap();
tokio::task::yield_now().await; // tokio::task::yield_now().await;
return Err(CustomError::Fatal); // return Err(CustomError::Fatal);
} // }
} // }
_ => { // _ => {
tx.send(101).await.unwrap(); // tx.send(101).await.unwrap();
return Err(CustomError::Fatal); // return Err(CustomError::Fatal);
} // }
} // }
} // }
} // }
tokio::time::sleep(Duration::from_millis(100)).await; // tokio::time::sleep(Duration::from_millis(100)).await;
Ok(()) // Ok(())
} // }
/// # Fn `looped_service_connecting` /// # Fn `looped_service_connecting`
/// ## for service's state check in loop (with delay and restriction of attempts) /// ## for service's state check in loop (with delay and restriction of attempts)
@ -80,54 +263,54 @@ pub async fn service_handler(
/// ///
/// *depends on* : fn `check_service` /// *depends on* : fn `check_service`
/// ///
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { // async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
if serv.triggers.wait == 0 { // if serv.triggers.wait == 0 {
loop { // loop {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; // tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!( // warn!(
"Attempting to connect from {} process to {}:{}", // "Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port // &name, &serv.hostname, &serv.port
); // );
match check_service(&serv.hostname, &serv.port).await { // match check_service(&serv.hostname, &serv.port).await {
Ok(_) => { // Ok(_) => {
log::info!( // log::info!(
"Successfully connected to {} from {} process!", // "Successfully connected to {} from {} process!",
&serv.hostname, // &serv.hostname,
&name // &name
); // );
break; // break;
} // }
Err(_) => { // Err(_) => {
tokio::task::yield_now().await; // tokio::task::yield_now().await;
} // }
} // }
} // }
Ok(()) // Ok(())
} else { // } else {
let start = Instant::now(); // let start = Instant::now();
while start.elapsed().as_secs() < serv.triggers.wait.into() { // while start.elapsed().as_secs() < serv.triggers.wait.into() {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; // tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!( // warn!(
"Attempting to connect from {} process to {}:{}", // "Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port // &name, &serv.hostname, &serv.port
); // );
match check_service(&serv.hostname, &serv.port).await { // match check_service(&serv.hostname, &serv.port).await {
Ok(_) => { // Ok(_) => {
log::info!( // log::info!(
"Successfully connected to {} from {} process!", // "Successfully connected to {} from {} process!",
&serv.hostname, // &serv.hostname,
&name // &name
); // );
return Ok(()); // return Ok(());
} // }
Err(_) => { // Err(_) => {
tokio::task::yield_now().await; // tokio::task::yield_now().await;
} // }
} // }
} // }
Err(CustomError::Fatal) // Err(CustomError::Fatal)
} // }
} // }
/// # Fn `check_service` /// # Fn `check_service`
/// ## for check current service's availiability /// ## for check current service's availiability