Compare commits

..

44 Commits

Author SHA1 Message Date
prplV aae0391a0c files fix 2025-05-05 09:14:25 -04:00
prplV 3dd238cf97 debug -> opened issue #40 2025-05-04 11:01:07 -04:00
prplV 403285a937 OWNERSHIP FIX 3 2025-05-04 09:53:59 -04:00
prplV a16eb78b79 key: String -> Arc<str> 2025-05-04 09:31:42 -04:00
prplV 281841b68a OWNERSHIP FIX : prcs with Arc<str> (instead of &'_ str) 2025-05-04 09:19:50 -04:00
prplV 34979a035d OWNERSHIP FIX: files with Arc<str> 2025-05-04 09:15:03 -04:00
prplV 09c1baed8e structs bug fixed 2025-05-04 09:14:40 -04:00
prplV 3c22a67052 OWNERSHIP FIX: services with ownership using Arc<str> 2025-05-04 08:59:55 -04:00
prplV 052448a7b9 OWNERSHIP FIX: structs with Arc<str> (instead of &'_ str) 2025-05-04 08:59:25 -04:00
prplV 6d56d1e39c big change but it's still not working in utils.rs 2025-04-25 10:56:42 -04:00
prplV 541b0f52dd supervisor work 2025-04-23 10:34:07 -04:00
prplV 2495fb84cf services controller fixed (fuh) 2025-04-22 11:22:22 -04:00
prplV c3fd0dd09f files logic fixed 2025-04-22 11:22:07 -04:00
prplV 502ea114a6 debug + casting 2025-04-22 11:18:36 -04:00
prplV 28092d945a prcs logic fixed 2025-04-22 11:18:14 -04:00
prplV 0d68efd461 service controller 2025-04-18 08:44:35 -04:00
prplV 4fc90300fc file fix 2025-04-18 08:44:16 -04:00
prplV e3f07f42a6 new structs, controller and processors 2025-04-17 09:59:33 -04:00
prplV cd7669d942 controllers impls and trait 2025-04-15 10:32:29 -04:00
prplV 721fa6c758 prc state check logic fixed without code repeating 2025-04-10 08:58:40 -04:00
prplV c50c444f21 pub commands (need later) 2025-04-10 08:54:12 -04:00
prplV 71acb4a32e services v2 controller without impl 2025-04-10 08:54:00 -04:00
prplV f504632c4d files v2 controller without impl 2025-04-10 08:53:52 -04:00
prplV 2b82fb7aac prcs v2 controller with impl 2025-04-10 08:53:38 -04:00
prplV 584404c050 preparing for prcsv2 2025-04-10 08:52:58 -04:00
prplV 886ae6308b versions change 2025-03-31 10:08:50 -04:00
prplV 8f1214bd9a refactor 2025-03-31 10:06:18 -04:00
prplV 011c479550 big async optimization 2025-03-31 09:46:22 -04:00
prplV 27e79ce731 us update 2 2025-03-28 07:35:07 -04:00
prplV 026a502044 us changes 2025-03-28 05:13:43 -04:00
prplV 163887d42c tcp -> unixsocket 2025-03-27 10:46:22 -04:00
prplV 35a21da431 reding config lc+pubsub rework 2025-03-27 10:33:37 -04:00
prplV 064611823a ass 2025-02-06 15:08:16 +03:00
prplV e4c3e5f46f fix of lc_result ending firstly 2025-02-06 14:39:32 +03:00
prplV 6adab1b903 joinhandlers ending is catching and processing 2025-02-06 14:37:17 +03:00
prplV 98da769dd3 lc check delay decreased 2025-02-06 13:50:18 +03:00
prplV a8a7fd8a72 init_config_mechanism 50% 2025-02-05 17:50:37 +03:00
prplV 3d88967281 pubsub_config_reciever + cli-local adj 2025-02-05 17:38:41 +03:00
prplV 8c1998c93f local_config_reciever created with fn's helpers 2025-02-05 14:47:08 +03:00
prplV f560dfebc5 planning +lcr 2025-02-04 18:34:21 +03:00
prplV 0f160f4dcd deafult processes 2025-02-04 17:57:55 +03:00
prplV 7a5704dd93 cli_config_reciever fn added 2025-02-04 16:53:29 +03:00
prplV df03bd5346 config pubsub pre-dev skeleton 2025-02-03 13:58:01 +03:00
prplV e9b6abefdf preboot changed + config setting up 2025-02-03 12:12:18 +03:00
18 changed files with 1726 additions and 554 deletions

1
.gitignore vendored
View File

@ -4,3 +4,4 @@
Cargo.lock
hagent_test.sock
release
*.sock

View File

@ -1,6 +1,6 @@
[package]
name = "noxis-cli"
version = "0.2.4"
version = "0.2.7"
edition = "2021"
[dependencies]

View File

@ -2,11 +2,17 @@ use clap::{Parser, Subcommand};
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
pub struct Cli {
#[arg(
short,
default_value="noxis-rs.sock",
help="explicit specify of NOXIS Socket file"
)]
pub socket : String,
#[command(
subcommand,
help = "to manage Noxis work",
)]
command : Commands,
pub command : Commands,
}
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
@ -50,13 +56,13 @@ pub struct StartAction {
num_args = 1..,
value_delimiter = ' '
)]
flags : Vec<String>,
pub flags : Vec<String>,
}
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
pub struct ConfigCommand {
#[command(subcommand)]
action : ConfigAction,
pub action : ConfigAction,
}
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
@ -83,12 +89,12 @@ pub struct LocalConfig {
action,
help = "to read following input as JSON",
)]
is_json : bool,
pub is_json : bool,
// value
#[arg(
help = "path to config file or config String (with --json flag)",
)]
config : String,
pub config : String,
}
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
@ -96,16 +102,16 @@ pub struct ProcessCommand {
#[arg(
help = "name of needed process",
)]
process : String,
pub process : String,
#[command(
subcommand,
help = "To get current process's status",
)]
action : ProcessAction,
pub action : ProcessAction,
}
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
enum ProcessAction {
pub enum ProcessAction {
#[command(
about = "To get info about current process status",
)]

View File

@ -1,14 +1,15 @@
use thiserror::Error;
use super::cli_net::NOXIS_RS_CREDS;
#[derive(Debug, Error)]
pub enum NoxisCliError {
#[error("Can't send any data to {:?}. Noxis-rs daemon is disabled or can't be accessed", NOXIS_RS_CREDS)]
NoxisDaemonMissing,
#[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's web-functionality")]
#[error("Can't find socket `{0}`. Error : {1}")]
NoxisDaemonMissing(String, String),
#[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")]
PortIsNotWritable,
#[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")]
CliPromptCanNotBeSent,
#[error("Can't parse CLI struct and send as byte stream")]
ToStringCliParsingParsing,
#[error("Can't read Noxis response due to {0}")]
CliResponseReadError(String)
}

View File

@ -1,32 +1,30 @@
use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use tokio::io::{AsyncWriteExt, AsyncReadExt};
use tokio::time::{Duration, sleep};
use anyhow::Result;
use super::Cli;
use super::cli_error::NoxisCliError;
pub const NOXIS_RS_CREDS: &str = "127.0.0.1:7753";
pub async fn create_tcp_stream() -> Result<TcpStream> {
Ok(TcpStream::connect(NOXIS_RS_CREDS).await.map_err(|_| NoxisCliError::NoxisDaemonMissing)?)
async fn create_us_stream(cli: &Cli) -> Result<UnixStream> {
Ok(UnixStream::connect(&cli.socket).await.map_err(|er| NoxisCliError::NoxisDaemonMissing((&cli.socket).to_string(), er.to_string()))?)
}
pub async fn try_send(stream: Result<TcpStream>, params: Cli) -> Result<()> {
use serde_json::to_string;
let mut stream = stream.map_err(|_| NoxisCliError::NoxisDaemonMissing)?;
loop {
if stream.writable().await.is_err() {
sleep(Duration::from_millis(100)).await;
continue;
}
// let msg: Cli = from_str(&format!("{:?}", params))?;
let msg= to_string(&params).map_err(|_| NoxisCliError::ToStringCliParsingParsing)?;
// let msg = r"HTTP/1.1 POST\r\nContent-Length: 14\r\nContent-Type: text/plain\r\n\r\nHello, World!@";
pub async fn try_send(cli: Cli) -> Result<()> {
// let stream = create_us_stream(&cli).await;
let mut stream = create_us_stream(&cli).await?;
stream.write_all(msg.as_bytes()).await.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
// ...
break;
}
let msg = serde_json::to_vec(&cli)
.map_err(|_| NoxisCliError::ToStringCliParsingParsing)?;
stream.write_all(&msg)
.await
.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
let mut response = [0; 1024];
stream.read(&mut response)
.await
.map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?;
println!("Received response: {}", String::from_utf8_lossy(&response));
Ok(())
}

View File

@ -4,12 +4,12 @@ mod cli_error;
use clap::Parser;
use cli::Cli;
use cli_net::{create_tcp_stream, try_send};
use cli_net::try_send;
use anyhow::Result;
#[tokio::main]
async fn main() -> Result<()>{
let cli = Cli::parse();
try_send(create_tcp_stream().await, cli).await?;
try_send(cli).await?;
Ok(())
}

View File

@ -1,6 +1,6 @@
[package]
name = "noxis-rs"
version = "0.11.10"
version = "0.11.26"
edition = "2021"
[dependencies]
@ -8,13 +8,15 @@ anyhow = "1.0.93"
chrono = "0.4.38"
clap = { version = "4.5.21", features = ["derive"] }
env_logger = "0.11.3"
inotify = "0.10.2"
inotify = "0.11.0"
log = "0.4.22"
pcap = "2.2.0"
redis = "0.25.4"
redis = "0.29.2"
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.118"
sysinfo = "0.32.0"
tokio = { version = "1.38.0", features = ["full", "time"] }
noxis-cli = { path = "../noxis-cli" }
dotenv = "0.15.0"
futures = "0.3.31"
async-trait = "0.1.88"

View File

@ -1,6 +1,6 @@
{
"dateOfCreation": "1721381809104",
"configServer": "localhost",
"dateOfCreation": "1721381809112",
"configServer": "192.168.2.37",
"processes": [
{
"name": "temp-process",
@ -12,7 +12,7 @@
"src": "./tests/examples/",
"triggers": {
"onDelete": "stop",
"onChange": "stay"
"onChange": "restart"
}
}
],
@ -22,8 +22,7 @@
"port": 443,
"triggers": {
"wait": 10,
"delay": 2,
"onLost": "hold"
"onLost": "restart"
}
}
]

View File

@ -1,7 +1,6 @@
mod options;
mod utils;
use anyhow::Error;
use clap::Parser;
use log::{error, info};
use options::config::*;
@ -14,84 +13,140 @@ use std::time::Duration;
use tokio::sync::mpsc;
use utils::*;
use options::preboot::PrebootParams;
use tokio::sync::{broadcast, oneshot};
use options::config::v2::init_config_mechanism;
use utils::v2::init_monitoring;
#[tokio::main(flavor = "multi_thread")]
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()>{
let preboot = Arc::new(PrebootParams::parse().validate()?);
let _ = setup_logger();
info!("Runner is configurating...");
// setting up redis connection \
// then conf checks to choose the most actual \
let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
error!("No actual configuration for runner. Stopping...");
std::process::exit(1);
});
info!(
"Current runner configuration: {}",
&processes.date_of_creation
);
info!("Runner is ready. Initializing...");
if processes.processes.is_empty() {
error!("Processes list is null, runner-rs initialization is stopped");
return Err(Error::msg("Empty processes segment in config"));
}
info!("Noxis is configurating...");
//
let (tx_brd, mut rx_brd) = broadcast::channel::<Processes>(1);
// cli <-> config
let (tx_oneshot, rx_oneshot) = oneshot::channel::<Processes>();
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// is in need to send to the signals handler thread
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
for proc in processes.processes.iter() {
info!(
"Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
proc.name,
proc.path,
proc.dependencies.files.len(),
proc.dependencies.services.len()
);
// creating msg channel
// can or should be executed in new thread
let (tx, mut rx) = mpsc::channel::<u8>(1);
let proc = Arc::new(proc.clone());
let tx = Arc::new(tx.clone());
senders.push(Arc::clone(&tx.clone()));
let event = tokio::spawn(async move {
run_daemons(proc.clone(), tx.clone(), &mut rx).await;
});
handler.push(event);
}
// destructor addition
handler.push(tokio::spawn(async move {
if set_valid_destructor(Arc::new(senders)).await.is_err() {
error!("Linux signals handler creation failed. Terminating main thread...");
return;
// initilaizing task for config manipulations
let config_module = tokio::spawn(async move {
let _ = init_config_mechanism(
rx_oneshot,
tx_brd,
preboot.clone()
).await;
});
handler.push(config_module);
// initilaizing task for cli manipulation
let cli_module = tokio::spawn(async move {
if let Err(er) = init_cli_pipeline().await {
error!("CLI pipeline failed due to {}", er)
}
});
handler.push(cli_module);
tokio::time::sleep(Duration::from_millis(200)).await;
info!("End of job. Terminating main thread...");
// initilaizing task for deinitializing `Noxis`
let ctrlc = tokio::spawn(async move {
if let Err(er) = set_valid_destructor(vec![].into()).await {
error!("Destructor mod failed due to {}", er);
}
std::process::exit(0);
}));
});
handler.push(ctrlc);
// remote config update subscription
handler.push(tokio::spawn(async move {
let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
}));
// cli pipeline
handler.push(tokio::spawn(async move {
let _ = init_cli_pipeline().await;
}));
let monitoring = tokio::spawn(async move {
let config = {
let mut tick = tokio::time::interval(Duration::from_millis(500));
loop {
tick.tick().await;
break match rx_brd.try_recv() {
Ok(conf) => conf,
Err(_) => continue,
}
}
};
if let Err(er) = init_monitoring(config).await {
error!("Monitoring mod failed due to {}", er);
}
});
handler.push(monitoring);
for i in handler {
let _ = i.await;
}
// setting up redis connection \
// then conf checks to choose the most actual \
// let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
// error!("No actual configuration for runner. Stopping...");
// std::process::exit(1);
// });
//
// info!(
// "Current runner configuration: {}",
// &processes.date_of_creation
// );
// info!("Runner is ready. Initializing...");
//
// if processes.processes.is_empty() {
// error!("Processes list is null, runner-rs initialization is stopped");
// return Err(Error::msg("Empty processes segment in config"));
// }
// let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// // is in need to send to the signals handler thread
// let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
//
// for proc in processes.processes.iter() {
// info!(
// "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
// proc.name,
// proc.path,
// proc.dependencies.files.len(),
// proc.dependencies.services.len()
// );
//
// // creating msg channel
// // can or should be executed in new thread
// let (tx, mut rx) = mpsc::channel::<u8>(1);
// let proc = Arc::new(proc.clone());
// let tx = Arc::new(tx.clone());
//
// senders.push(Arc::clone(&tx.clone()));
//
// let event = tokio::spawn(async move {
// run_daemons(proc.clone(), tx.clone(), &mut rx).await;
// });
// handler.push(event);
// }
//
// // destructor addition
// handler.push(tokio::spawn(async move {
// if set_valid_destructor(Arc::new(senders)).await.is_err() {
// error!("Linux signals handler creation failed. Terminating main thread...");
// return;
// }
//
// tokio::time::sleep(Duration::from_millis(200)).await;
// info!("End of job. Terminating main thread...");
// std::process::exit(0);
// }));
//
// // remote config update subscription
// handler.push(tokio::spawn(async move {
// let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
// }));
//
// // cli pipeline
// handler.push(tokio::spawn(async move {
// let _ = init_cli_pipeline().await;
// }));
//
// for i in handler {
// let _ = i.await;
// }
Ok(())
}

View File

@ -1,12 +1,9 @@
use log::{error, info, warn};
use tokio::net::{TcpListener, TcpStream};
use anyhow::{Result as DynResult, Error};
use log::{error, info};
use tokio::net::{ UnixStream, UnixListener };
use tokio::time::{sleep, Duration};
use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}};
// use std::io::BufReader;
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
use std::fs;
use tokio::io::{ AsyncWriteExt, AsyncReadExt};
use noxis_cli::Cli;
use serde_json::from_str;
/// # Fn `init_cli_pipeline`
/// ## for catching all input requests from CLI
@ -21,49 +18,32 @@ use serde_json::from_str;
///
/// *depends on* : -
///
pub async fn init_cli_pipeline() -> DynResult<()> {
match init_listener().await {
Some(list) => {
pub async fn init_cli_pipeline() -> anyhow::Result<()> {
let socket_path = "noxis.sock";
let _ = fs::remove_file(socket_path);
match UnixListener::bind(socket_path) {
Ok(list) => {
// TODO: remove `unwrap`s
info!("Listening on {}", socket_path);
loop {
if let Ok((socket, addr)) = list.accept().await {
// isolation
if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() {
warn!("Declined attempt to connect TCP-socket from {}", addr);
continue;
}
process_connection(socket).await;
match list.accept().await {
Ok((socket, _)) => {
// tokio::spawn();
process_connection(socket).await;
},
Err(er) => {
error!("Cannot poll connection to CLI due to {}", er);
sleep(Duration::from_millis(300)).await;
},
}
sleep(Duration::from_millis(500)).await;
}
// Ok(())
},
None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use"))
}
}
/// # Fn `init_listener`
/// ## for creating TCP-listener for communicating with CLI
///
/// *input* : -
///
/// *output* : `Some<TcpListener>` if port 7753 was opened | None if not
///
/// *initiator* : fn `init_cli_pipeline`
///
/// *managing* : `TcpListener` object to handle requests
///
/// *depends on* : `tokio::net::TcpListener`
///
async fn init_listener() -> Option<TcpListener> {
match TcpListener::bind("127.0.0.1:7753").await {
Ok(listener) => {
info!("Runner is listening localhost:7753");
Some(listener)
Err(er) => {
error!("Failed to open UnixListener for CLI");
Err(er.into())
},
Err(_) => {
error!("Cannot create TCP listener for CLI");
None
}
}
}
@ -80,27 +60,29 @@ async fn init_listener() -> Option<TcpListener> {
///
/// *depends on* : `tokio::net::TcpStream`
///
async fn process_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(stream.borrow_mut());
let mut rqst = buf_reader.lines();
while let Ok(Some(line)) = rqst.next_line().await {
if line.is_empty() {
break
}
match from_str::<Cli>(&line) {
Ok(req) => {
// TODO: func wrapper
dbg!(req);
},
Err(_) => {
break
},
}
println!("{}", line);
async fn process_connection(mut stream: UnixStream) {
let mut buf = vec![0; 1024];
match stream.read(&mut buf).await {
Ok(0) => {
info!("Client disconnected ");
},
Ok(n) => {
buf.truncate(n);
info!("CLI have sent {} bytes", n);
match serde_json::from_slice::<Cli>(&buf) {
Ok(cli) => {
info!("Received CLI request: {:?}", cli);
let response = "OK";
if let Err(e) = stream.write_all(response.as_bytes()).await {
error!("Failed to send response: {}", e);
}
}
Err(e) => {
error!("Failed to parse CLI request: {}", e);
}
}
},
Err(e) => error!("Failed to read from socket: {}", e),
}
let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!";
stream.write_all(response.as_bytes()).await.unwrap();
let _ = stream.shutdown().await;
}

View File

@ -9,9 +9,393 @@ use std::sync::Arc;
use std::{env, fs};
use super::preboot::PrebootParams;
use tokio::time::{Duration, sleep};
// use redis::PubSub;
use tokio::sync::{
oneshot,
oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender },
broadcast::Sender as BroadcastSender, broadcast::Receiver as BroadcastReceiver };
use crate::utils::files::create_watcher;
use std::fs::File;
use inotify::EventMask;
// const CONFIG_PATH: &str = "settings.json";
pub mod v2 {
use std::path::PathBuf;
use crate::utils::get_container_id;
use super::*;
pub async fn init_config_mechanism(
// to handle cli config changes
cli_oneshot: OneShotReciever<Processes>,
// to share local config with PRCS, CLI_PIPELINE and CONFIG modules
brd_tx : BroadcastSender<Processes>,
// preboot params (args)
params : Arc<PrebootParams>
/*...*/
) {
// channel for pubsub to handle local config pulling
let local_config_brd_reciever = brd_tx.subscribe();
// channel between pub-sub mech and local config mech
let (tx_pb_lc, rx_pb_lc) = oneshot::channel::<bool>();
// channel between cli mech and local config mech
let (tx_cli_lc, rx_cli_lc) = oneshot::channel::<bool>();
// dbg!("before lc");
let params_clone = params.clone();
let for_lc_path = params.clone();
let lc_path = for_lc_path
.config
.to_str()
.unwrap_or("settings.json");
// future to init work with local config
let lc_future = tokio::spawn(
// let params = params.clone();
local_config_reciever(
params_clone,
rx_pb_lc,
rx_cli_lc,
Arc::new(brd_tx)
)
);
// dbg!("before pb");
// future to init work with pub sub mechanism
let pubsub_future = tokio::spawn(
pubsub_config_reciever(
tx_pb_lc,
params.clone(),
local_config_brd_reciever
)
);
// dbg!("before cli");
// future to catch new configs from cli pipeline
let cli_future = tokio::spawn(
from_cli_config_reciever(
cli_oneshot,
tx_cli_lc
)
);
// let _ = lc_future.await;
// dbg!("before select");
tokio::select! {
lc_result = lc_future => {
// dbg!("end of lc");
match lc_result {
Ok(res) => {
if res.is_ok() {
info!("Local config warding mechanism stopped, waiting for others ...");
sleep(Duration::from_millis(500)).await;
let _ = restart_main_thread();
}
else {
error!("Local config warding mechanism crushed, restarting ...");
let _ = restart_main_thread();
}
},
Err(_) => {
error!("Local config warding mechanism crushed, restarting ...");
let _ = restart_main_thread();
},
}
},
pb_result = pubsub_future => {
match pb_result {
Ok(res) => {
if res.is_ok() {
info!("New config was saved locally, restarting ...");
}
else {
error!("Pubsub mechanism crushed, restarting ...");
}
},
Err(_) => {
error!("Pubsub mechanism crushed, restarting ...");
},
}
let _ = restart_main_thread();
},
cli_config_option = cli_future => {
match cli_config_option {
Err(_) => error!("CLI pulling new config mechanism crushed, restarting ..."),
Ok(option_config) => {
match option_config {
None => error!("CLI pulling new config mechanism crushed, restarting ..."),
Some(config) => {
info!("New config was pulled from CLI, saving and restarting ...");
let _ = save_new_config(&config, lc_path);
},
}
},
}
let _ = restart_main_thread();
},
}
// dbg!("after select");
// TODO! futures + select! [OK]
// TODO! tests config
}
pub async fn get_redis_connection(params: &str) -> Option<Connection> {
for i in 1..=3 {
let redis_url = format!("redis://{}/", params);
info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i);
if let Ok(client) = Client::open(redis_url) {
if let Ok(conn) = client.get_connection() {
info!("Successfully opened Redis connection");
return Some(conn);
}
}
error!("Error with subscribing Redis stream on update. Retrying in 5 secs...");
sleep(Duration::from_secs(5)).await;
}
None
}
// loop checking redis pubsub
async fn pubsub_config_reciever(
// to stop checking local config
local_conf_tx : OneShotSender<bool>,
params : Arc<PrebootParams>,
tx_brd_local : BroadcastReceiver<Processes>,
) -> anyhow::Result<()>{
/*...*/
// dbg!("start of pb");
let mut tx_brd_local = tx_brd_local;
let local_config = if !tx_brd_local.is_empty() {
tx_brd_local.recv().await?
} else {
// Processes::default()
let mut tick = tokio::time::interval(Duration::from_millis(500));
loop {
tick.tick().await;
break match tx_brd_local.recv().await {
Ok(conf) => conf,
Err(_) => continue,
};
}
};
match get_redis_connection(&local_config.config_server).await {
Some(mut conn) => {
let mut pub_sub = conn.as_pubsub();
let channel_name = get_container_id().unwrap_or(String::from("default"));
let channel_name = channel_name.trim();
match pub_sub.subscribe(channel_name) {
Err(er) => {
error!("Cannot subscribe pubsub channel due to {}", &er);
return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
},
Ok(_) => {
info!("Successfully subscribed to {} pubsub channel", channel_name);
let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3)));
loop {
if let Ok(msg) = pub_sub.get_message() {
// dbg!("ok on get message");
let payload : Result<String, _> = msg.get_payload();
match payload {
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
Ok(payload) => {
if let Some(remote) = parse_extern_config(&payload) {
match config_comparing(&local_config, &remote) {
ConfigActuality::Local => {
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
},
ConfigActuality::Remote => {
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
// to stop watching local config file mechanism
let _ = local_conf_tx.send(true);
let config_path = params.config.to_str().unwrap_or("settings.json");
if save_new_config(&remote, &config_path).is_err() {
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
return Err(anyhow::Error::msg(
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
))
}
return Ok(());
},
}
}
else {
warn!("Invalid config was pulled from Redis channel")
}
},
}
}
// delay
tokio::task::yield_now().await;
}
},
}
},
None => {
sleep(Duration::from_secs(20)).await;
}
}
Ok(())
}
//
async fn local_config_reciever(
params : Arc<PrebootParams>,
pubsub_oneshot : OneShotReciever<bool>,
cli_oneshot : OneShotReciever<bool>,
brd_tx : Arc<BroadcastSender<Processes>>,
/*...*/
) -> anyhow::Result<()> {
/*...*/
// shadowing as mut
let mut pubsub_oneshot = pubsub_oneshot;
let mut cli_oneshot = cli_oneshot;
// fill with default empty config, mut to change later
let mut _current_config = Processes::default();
// PathBuf to &str to work with local config path as slice
let local_config_path = params
.config
.to_str()
.unwrap_or("settings.json");
match load_processes(local_config_path) {
// if local exists
Some(conf) => {
info!("Local config `{}` was found.", &conf.date_of_creation);
_current_config = conf;
if let Err(er) = brd_tx.send(_current_config.clone()) {
error!("Cannot share local config with broadcast due to {}", er);
}
},
// if local is not exist
None => {
warn!("Local config wasn't found. Waiting for new ...");
return Err(anyhow::Error::msg("No local config"));
// ...
},
}
// 100% local exists here
// create watcher on local config file
match create_watcher("", local_config_path) {
Ok(mut watcher) => {
loop {
let mut need_to_export_config = false;
// let mut need_to_recreate_watcher = false;
// return situations here
// 1) oneshot signal
// 2) if config was deleted -> recreate and fill with current config that is held here
// 3) if config was changed -> fill with current config that is held here
// catching signal from pubsub
// it's because pubsub mech pulled new valid and actual config and now it's time to ...
// ... overwrite local config file and restart main thread
if let Ok(_) = pubsub_oneshot.try_recv() {
sleep(Duration::from_secs(1)).await;
return Ok(());
}
// catching signal from cli
// it's because cli mech pulled new valid and actual config and now it's time to ...
// ... overwrite local config file and restart main thread (like in previous mechanism)
if let Ok(_) = cli_oneshot.try_recv() {
sleep(Duration::from_secs(1)).await;
return Ok(());
}
// ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ...
// ! ... FROM WATCHER"S BUFFER
// existing check
if !params.config.exists() {
warn!("Local config file was deleted or moved. Recreating new one with saved data ...");
need_to_export_config = true;
// need_to_recreate_watcher = true;
} else {
// changes check
let mut buffer = [0; 128];
let events = watcher.read_events(&mut buffer);
if events.is_ok() {
let events: Vec<EventMask> = events
.unwrap()
.map(|mask| mask.mask)
.filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
})
.collect();
if !events.is_empty() {
warn!("Local config file was overwritten. Discarding changes ...");
need_to_export_config = true;
// events
// .iter()
// .any(|event| *event == EventMask::DELETE_SELF)
// .then(|| need_to_recreate_watcher = true);
}
}
}
// exporting data
if need_to_export_config {
if let Err(er) = export_saved_config_data_locally(&params.config, &_current_config).await {
error!("Cannot save actual imported config due to {}", er);
} else {
// recreation watcher (draining activity buffer mechanism)
// if local config file was deleted and recreated
// if local config file was modified locally
match create_watcher("", local_config_path) {
Ok(new) => watcher = new,
Err(er) => error!("Cannot create new watcher due to {}", er),
}
}
}
sleep(Duration::from_millis(300)).await;
// tokio::task::yield_now().await;
}
},
Err(_) => {
error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path);
return Err(anyhow::Error::msg("Cannot create watcher on local config file"));
},
}
}
// [:IN-TEST]
async fn from_cli_config_reciever(
cli_oneshot: OneShotReciever<Processes>,
to_local_tx: OneShotSender<bool>
) -> Option<Processes> {
/* match awaits til channel*/
// dbg!("start of cli");
loop {
if !cli_oneshot.is_empty() {
match cli_oneshot.await {
Ok(config_from_cli) => {
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);
let _ = to_local_tx.send(true);
return Some(config_from_cli)
},
_ => return None,
}
}
sleep(Duration::from_millis(300)).await;
}
}
async fn export_saved_config_data_locally(
config_file_path: &PathBuf,
current_config: &Processes
) -> anyhow::Result<()> {
let mut file = File::create(config_file_path)?;
file.write_all(
serde_json::to_string_pretty(current_config)?.as_bytes()
)?;
Ok(())
// Ok(())
}
}
/// # Fn `load_processes`
/// ## for reading and parsing *local* storing config
///
@ -54,14 +438,14 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
error!("Invalid character in config file. Config path was set to default");
"settings.json"
});
info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url);
info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url);
match load_processes(config_path) {
Some(local_conf) => {
info!(
"Found local configuration, version - {}",
&local_conf.date_of_creation
);
if !params.no_remote_config {
if !params.no_sub {
if let Some(remote_conf) =
// TODO : rework with pubsub mech
once_get_remote_configuration(&format!("redis://{}/", &params.remote_server_url))
@ -85,7 +469,7 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
}
None => {
warn!("No local valid conf was found. Trying to pull remote one...");
if !params.no_remote_config {
if !params.no_sub {
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", &params.remote_server_url)));
if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
@ -322,7 +706,7 @@ fn restart_main_thread() -> std::io::Result<()> {
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
if params.no_sub || params.no_remote_config {
if params.no_sub {
return Err(CustomError::Fatal);
}
if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) {
@ -397,6 +781,9 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<Pr
/// *depends on* : `Processes`, `ConfigActuality`
///
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
if local.is_default() {
return ConfigActuality::Remote;
}
let local_date: u64 = local.date_of_creation.parse().unwrap();
let remote_date: u64 = remote.date_of_creation.parse().unwrap();

View File

@ -13,7 +13,7 @@ enum EnvVars {
NoxisNoHagent,
NoxisNoLogs,
NoxisRefreshLogs,
NoxisNoRemoteConfig,
// NoxisNoRemoteConfig,
NoxisNoConfigSub,
NoxisSocketPath,
NoxisLogTo,
@ -29,7 +29,7 @@ impl std::fmt::Display for EnvVars {
EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"),
EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"),
EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"),
EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"),
// EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"),
EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"),
EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"),
EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"),
@ -48,7 +48,7 @@ impl<'a> EnvVars {
EnvVars::NoxisNoHagent => "false",
EnvVars::NoxisNoLogs => "false",
EnvVars::NoxisRefreshLogs => "false",
EnvVars::NoxisNoRemoteConfig => "false",
// EnvVars::NoxisNoRemoteConfig => "false",
EnvVars::NoxisNoConfigSub => "false",
EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock",
EnvVars::NoxisLogTo => "./",
@ -77,7 +77,7 @@ impl<'a> EnvVars {
Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string());
Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string());
Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string());
Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string());
// Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string());
Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string());
Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap());
Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap());
@ -147,12 +147,6 @@ impl std::fmt::Display for MetricsPrebootParams {
/// noxis-rs ... --refresh-logs ...
/// ```
///
/// `--no-remote-config` - to disable work with Redis as config producer
/// ### usage :
/// ``` bash
/// noxis-rs ... --no-remote-config ...
/// ```
///
/// `--no-sub` - to disable Redis subscribtion mechanism
/// ### usage :
/// ``` bash
@ -212,17 +206,18 @@ pub struct PrebootParams {
help="To clear logs directory"
)]
pub refresh_logs : bool,
#[arg(
long = "no-remote-config",
action,
help="To disable work with remote config server",
conflicts_with="no_sub")]
pub no_remote_config : bool,
// #[arg(
// long = "no-remote-config",
// action,
// help="To disable work with remote config server",
// conflicts_with="no_sub")]
// pub no_remote_config : bool,
#[arg(
long = "no-sub",
action,
help="To disable subscription mechanism",
conflicts_with="no_remote_config")]
help="To disable Redis subscription mechanism",
)]
// conflicts_with="no_remote_config"
pub no_sub : bool,
// params (socket_path, log_to, remote_server_url, config)
@ -243,7 +238,7 @@ pub struct PrebootParams {
#[arg(
long = "remote-server-url",
default_value="localhost",
conflicts_with="no_remote_config",
conflicts_with="no_sub",
help = "To set url of remote config server using in remote config pulling mechanism"
)]
pub remote_server_url : String,
@ -288,15 +283,17 @@ impl PrebootParams {
// existing log dir
if !self.log_to.exists() && !self.no_logs {
eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default");
self.refresh_logs = false;
self.log_to = PathBuf::from("./");
// return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start"));
}
// existing sock file
if !self.config.exists() {
eprintln!("Error: Invalid character in config file. Config path was set to default");
let config = PathBuf::from("/etc/settings.json");
if !config.exists() && self.no_remote_config {
return Err(Error::msg("Noxis cannot run without config. Create local config or enable remote-config mechanism"));
// TODO : ??? wtf is going with 2 paths
let config = PathBuf::from("/etc/enode/noxis/settings.json");
if !config.exists() && self.no_sub {
return Err(Error::msg("Noxis cannot run without config. Create local config or enable pubsub mechanism"));
}
self.config = PathBuf::from("settings.json");
// return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start"));
@ -353,20 +350,20 @@ mod preboot_unitests{
"runner-rs",
"--no-sub",
"--remote-server-url", "redis://127.0.0.1"
]).is_ok())
}
#[test]
fn parsing_config_invalid_args_noremote_nosub() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--no-remote-config", "--no-sub"
]).is_err())
}
// #[test]
// fn parsing_config_invalid_args_noremote_nosub() {
// assert!(PrebootParams::try_parse_from(vec![
// "runner-rs",
// "--no-remote-config", "--no-sub"
// ]).is_err())
// }
#[test]
fn parsing_config_invalid_args_noremote_remoteurl() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--no-remote-config",
"--no-sub",
"--remote-server-url", "redis://127.0.0.1"
]).is_err())
}

View File

@ -22,7 +22,7 @@ type SendersVec = Arc<Vec<Arc<mpsc::Sender<u8>>>>;
///
/// *depends on* : Sig, Signals
///
pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError> {
pub async fn set_valid_destructor(senders: SendersVec) -> anyhow::Result<()> {
let (mut int, mut term, mut stop) = (
Sig::new(Signals::Sigint, senders.clone()),
Sig::new(Signals::Sigterm, senders.clone()),

View File

@ -2,7 +2,114 @@
use std::net::Ipv4Addr;
use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use std::sync::Arc;
#[derive(Debug)]
pub enum DependencyType {
File,
Service,
}
#[derive(Debug)]
pub enum ServiceState {
Ok,
Unavailable
}
pub struct ServiceWaitConfig(u32);
impl Default for ServiceWaitConfig {
fn default() -> Self {
Self(5)
}
}
pub enum FileTriggerType {
OnChange,
OnDelete,
}
impl std::fmt::Display for FileTriggerType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self {
FileTriggerType::OnChange => write!(f, "File was changed"),
FileTriggerType::OnDelete => write!(f, "File was moved or deleted"),
}
}
}
impl<'a> FileTriggerType {
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger)),
}
}
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger.on_delete.clone())),
}
}
}
#[derive(Debug)]
pub enum Triggers {
File { on_change: Arc<str>, on_delete: Arc<str> },
Service {on_lost: Arc<str>, wait: u32},
}
impl Triggers {
pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers {
Triggers::File { on_change, on_delete }
}
pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers {
Triggers::Service{on_lost, wait: wait_time}
}
pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> {
if let Triggers::Service { on_lost, .. } = self {
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone())))
}
None
}
}
#[derive(Debug)]
pub struct FileTriggersForController{ pub on_change: Arc<str>, pub on_delete: Arc<str> }
pub struct ServiceTriggersForController(Arc<str>);
impl std::fmt::Display for DependencyType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self {
DependencyType::File => write!(f, "File"),
DependencyType::Service => write!(f, "Service"),
}
}
}
#[derive(Debug)]
pub enum ProcessState {
Pending,
Holding,
Stopped,
StoppedByCli,
}
#[derive(Debug)]
pub enum Events {
Positive(Arc<str>),
Negative(NegativeOutcomes)
}
#[derive(Debug)]
pub enum NegativeOutcomes {
FileWasChanged(Arc<str>, DependencyType, Arc<str>),
FileWasMovedOrDeleted(Arc<str>, DependencyType, Arc<str>),
ServiceIsUnreachable(Arc<str>, DependencyType, Arc<str>),
}
#[async_trait]
pub trait ProcessUnit {
async fn process(&mut self);
}
/// # an Error enum (next will be deleted and replaced)
pub enum CustomError {
Fatal,
@ -40,6 +147,22 @@ pub struct Processes {
pub processes: Vec<TrackingProcess>,
}
impl Default for Processes {
fn default() -> Self {
Self {
date_of_creation : String::new(),
config_server : String::from("default"),
processes : Vec::new(),
}
}
}
impl Processes {
pub fn is_default(&self) -> bool {
self.date_of_creation.is_empty()
}
}
/// # Struct for the 2nd level in json conf file
/// ## for each process to contain info, such as name, path and dependencies
///
@ -135,7 +258,7 @@ pub struct Files {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Services {
pub hostname: String,
pub port: u32,
pub port: Option<u32>,
pub triggers: ServiceTriggers,
}
@ -159,7 +282,6 @@ pub struct Services {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ServiceTriggers {
pub wait: u32,
pub delay: u32,
#[serde(rename = "onLost")]
pub on_lost: String,
}

View File

@ -6,25 +6,207 @@ pub mod services;
// TODO : saving current flags state
use crate::options::structs::CustomError;
use crate::options::structs::TrackingProcess;
use files::create_watcher;
use files::file_handler;
use inotify::Inotify;
use log::{error, warn};
use crate::options::structs::{CustomError, TrackingProcess, Processes};
// use files::create_watcher;
// use files::file_handler;
// use inotify::Inotify;
use log::{error, warn, info};
use prcs::{
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
unfreeze_process,
};
use services::service_handler;
// use services::service_handler;
use std::process::Command;
use std::sync::Arc;
use tokio::join;
// use tokio::join;
use tokio::sync::mpsc;
use tokio::time::Duration;
// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender};
// controllers import
use prcs::v2::ProcessesController;
use files::v2::FilesController;
use services::v2::ServicesController;
use async_trait::async_trait;
const GET_ID_CMD: &str = "hostname";
pub mod v2 {
use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
use super::*;
#[derive(Debug)]
enum ControllerResult {
Process(Option<ProcessesController>),
File(Option<FilesController>),
Service(Option<ServicesController>),
}
#[derive(Debug)]
struct Supervisor {
prcs : LinkedList<ProcessesController>,
files : LinkedList<FilesController>,
services : LinkedList<ServicesController>,
}
impl Supervisor {
pub fn new() -> Supervisor {
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
}
pub async fn with_config(mut self, config: &Processes) -> Supervisor {
let _ = config.processes.iter()
.for_each(|prc| {
let (rx, tx) = mpsc::channel::<Events>(10);
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
if !self.prcs.contains(&temp) {
self.prcs.push_back(temp);
}
let rx = Arc::new(rx);
let proc_name: Arc<str> = Arc::from(prc.name.clone());
let _ = prc.dependencies.files.iter()
.for_each(|file| {
let mut hm = HashMap::new();
let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())};
hm.insert(proc_name.clone(), (triggers, rx.clone()));
let tempfile = FilesController::new(&file.filename.as_str(), hm)
.with_path(&file.src);
if let Ok(file) = tempfile {
if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) {
current_file.add_event(file);
} else {
self.files.push_back(file);
}
}
});
// servs
let _ = prc.dependencies.services.iter()
.for_each(|serv| {
let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref());
// preparations
let rx = rx.clone();
let serv_cont = ServicesController::new().with_access_name(
&serv.hostname,
&access_url
);
// triggers
let arc: Arc<str> = Arc::from(serv.triggers.on_lost.clone());
let triggers = Triggers::new_service(arc, serv.triggers.wait);
if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) {
proc.add_process(&prc.name, triggers, rx);
} else {
// vecdeque for queue
let mut vec: VecDeque<Arc<str>> = VecDeque::new();
vec.push_back(proc_name.clone());
// connection_queue
let mut connection_queue: BTreeMap<u32, VecDeque<Arc<str>>> = BTreeMap::new();
connection_queue.insert(serv.triggers.wait, vec);
// event_reg
let mut hm = HashMap::new();
hm.insert(proc_name.clone(), (triggers, rx));
let serv_cont = serv_cont.with_params(connection_queue, hm);
self.services.push_back(serv_cont);
}
});
});
self
}
pub fn get_stats(&self) -> String {
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len())
}
}
#[async_trait]
impl ProcessUnit for Supervisor {
async fn process(&mut self) {
info!("Initializing monitoring ...");
loop {
// dbg!(&self);
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
// let res = tokio::join!(prc.process(), file.process(), serv.process());
if let Some(mut val) = self.prcs.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::Process(Some(val))
})
);
}
if let Some(mut val) = self.files.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::File(Some(val))
})
);
}
if let Some(mut val) = self.services.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::Service(Some(val))
})
);
}
for task in tasks {
match task.await {
Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val),
Ok(ControllerResult::File(Some(val))) => self.files.push_back(val),
Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val),
Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."),
_ => { /* DEAD END (CAN NOT BE EXECUTED) */},
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
// spawn tasks
// spawn prc
// spawn files
// spawn services
// ## for ... i.await in loop
pub async fn init_monitoring(
config: Processes
) -> anyhow::Result<()> {
let mut supervisor = Supervisor::new().with_config(&config).await;
info!("Monitoring: {} ", &supervisor.get_stats());
supervisor.process().await;
Ok(())
}
// async fn generate_controllers<'a>(config: Processes) -> (HashSet<ProcessesController<'a>>, HashSet<FilesController<'a>>, HashSet<ServicesController<'a>>) {
// let mut prcs: HashSet<ProcessesController<'a>> = HashSet::new();
// let mut files: HashSet<FilesController<'a>> = HashSet::new();
// let mut services: HashSet<ServicesController<'a>> = HashSet::new();
// for prc in config.processes {
// let (rx, tx) = mpsc::channel::<Events<'a>>(10);
// // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path);
// let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path);
// let a = new_prc.process().await;
// }
// (prcs, files, services)
// }
// spawn prc check with semaphore check
async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) }
// spawn file check with semaphore check
async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) }
// spawn service check with semaphore check
async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) }
}
/// # Fn `run_daemons`
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
///
@ -40,37 +222,37 @@ const GET_ID_CMD: &str = "hostname";
///
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
///
pub async fn run_daemons(
proc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>,
rx: &mut mpsc::Receiver<u8>,
) {
// creating watchers + ---buffers---
let mut watchers: Vec<Inotify> = vec![];
for file in proc.dependencies.files.clone().into_iter() {
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
watchers.push(watcher);
} else {
let _ = tx.send(121).await;
}
// watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
}
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
Arc::new(tokio::sync::Mutex::new(watchers));
// pub async fn run_daemons(
// proc: Arc<TrackingProcess>,
// tx: Arc<mpsc::Sender<u8>>,
// rx: &mut mpsc::Receiver<u8>,
// ) {
// // creating watchers + ---buffers---
// let mut watchers: Vec<Inotify> = vec![];
// for file in proc.dependencies.files.clone().into_iter() {
// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
// watchers.push(watcher);
// } else {
// let _ = tx.send(121).await;
// }
// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
// }
// let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
// Arc::new(tokio::sync::Mutex::new(watchers));
loop {
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
tokio::select! {
_ = run_hand => continue,
_val = rx.recv() => {
if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
return;
}
},
}
tokio::task::yield_now().await;
}
}
// loop {
// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
// tokio::select! {
// _ = run_hand => continue,
// _val = rx.recv() => {
// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
// return;
// }
// },
// }
// tokio::task::yield_now().await;
// }
// }
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
match val {
@ -133,7 +315,6 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
},
// // 9 - File-dependency change -> staying (after check)
9 => {
// no need to trash logs
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
tokio::time::sleep(Duration::from_millis(100)).await;
},
@ -201,36 +382,36 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
///
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
///
pub async fn running_handler(
prc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) {
// services and files check (once)
let files_check = file_handler(
&prc.name,
&prc.dependencies.files,
tx.clone(),
watchers.clone(),
);
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
// pub async fn running_handler(
// prc: Arc<TrackingProcess>,
// tx: Arc<mpsc::Sender<u8>>,
// watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
// ) {
// // services and files check (once)
// let files_check = file_handler(
// &prc.name,
// &prc.dependencies.files,
// tx.clone(),
// watchers.clone(),
// );
// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
let res = join!(files_check, services_check);
// if inactive -> spawn checks -> active is true
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
if start_process(&prc.name, &prc.path).await.is_err() {
tx.send(3).await.unwrap();
return;
}
}
// if frozen -> spawn checks -> unfreeze is true
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
tx.send(10).await.unwrap();
return;
}
// tokio::time::sleep(Duration::from_millis(100)).await;
tokio::task::yield_now().await;
}
// let res = join!(files_check, services_check);
// // if inactive -> spawn checks -> active is true
// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
// if start_process(&prc.name, &prc.path).await.is_err() {
// tx.send(3).await.unwrap();
// return;
// }
// }
// // if frozen -> spawn checks -> unfreeze is true
// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
// tx.send(10).await.unwrap();
// return;
// }
// // tokio::time::sleep(Duration::from_millis(100)).await;
// tokio::task::yield_now().await;
// }
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
/// # Fn `get_container_id`

View File

@ -1,183 +1,321 @@
use crate::options::structs::{CustomError, Files};
use super::prcs::{is_active, is_frozen};
use inotify::{EventMask, Inotify, WatchMask};
use std::borrow::BorrowMut;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::Duration;
use crate::options::structs::{CustomError, Files};
use super::prcs::{is_active, is_frozen};
use inotify::{EventMask, Inotify, WatchMask};
use std::borrow::BorrowMut;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender as Sender;
use tokio::time::Duration;
use crate::options::structs::Events;
use async_trait::async_trait;
/// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events
///
/// *input* : `&str`, `&str`
///
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
///
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
///
/// *managing* : current file's name: &str, path in local storage to current file: &str
///
/// *depends on* : -
///
pub async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {
let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
Ok(inotify)
}
pub mod v2 {
use log::{error, info, warn};
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
use super::*;
use std::{collections::HashMap, path::Path};
/// # Fn `create_watcher`
/// ## for managing processes by checking dep files' states
///
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
///
/// *initiator* : fn `utils::running_handler`
///
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *depends on* : Files
///
pub async fn file_handler(
name: &str,
files: &[Files],
tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) -> Result<(), CustomError> {
for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() {
if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal);
type MpscSender = Arc<Sender<Events>>;
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
#[derive(Debug)]
enum FileState {
Ok,
NotFound,
}
#[derive(Debug)]
pub struct FilesController {
name : Arc<str>,
path : String,
code_name : Arc<str>,
state : FileState,
watcher : Option<Inotify>,
triggers : EventHandlers,
}
impl PartialEq for FilesController {
fn eq(&self, other: &Self) -> bool {
self.code_name == other.code_name
}
match file.triggers.on_delete.as_str() {
"stay" => {
tx.send(9).await.unwrap();
continue;
}
"stop" => {
if is_active(name).await {
tx.send(1).await.unwrap();
}
return Err(CustomError::Fatal);
}
"hold" => {
if is_active(name).await {
tx.send(2).await.unwrap();
return Err(CustomError::Fatal);
}
}
_ => {
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap();
return Err(CustomError::Fatal);
}
impl FilesController {
pub fn new(name: &str, triggers: EventHandlers) -> FilesController {
let name: Arc<str> = Arc::from(name);
Self {
name : name.clone(),
path : String::new(),
state : FileState::Ok,
watcher : None,
triggers,
code_name : name.clone(),
}
}
} else if is_active(name).await && !is_frozen(name).await {
let watchers = watchers.clone();
// println!("mutex: {:?}", watchers);
let mut buffer = [0; 128];
let mut mutex_guard = watchers.lock().await;
if let Some(notify) = mutex_guard.get_mut(i) {
let events = notify.read_events(&mut buffer);
// println!("{:?}", events);
if events.is_ok() {
let events: Vec<EventMask> = events
.unwrap()
.map(|mask| mask.mask)
.filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
})
.collect();
for event in events {
if let EventMask::DELETE_SELF = event {
// ! warning (DELETE_SELF event) !
// println!("! warning (DELETE_SELF event) !");
// * watcher recreation after dealing with file recreation mechanism in text editors
let mutex = notify.borrow_mut();
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
*mutex = watcher;
}
pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController> {
self.path = path.as_ref().to_string_lossy().into_owned();
self.watcher = {
match create_watcher(&self.name, &self.path) {
Ok(val) => Some(val),
Err(er) => {
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
return Err(er)
}
match file.triggers.on_change.as_str() {
"stop" => {
let _ = tx.send(7).await;
}
};
self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name));
Ok(self)
}
pub fn add_event(&mut self, file_controller : FilesController) {
for (k, v) in file_controller.triggers {
self.triggers.entry(k).or_insert(v);
}
}
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
for (prc_name, (triggers, channel)) in &self.triggers {
let msg = match &trigger_type {
None => {
Events::Positive(self.code_name.clone())
},
Some(event) => {
info!("Event on file {} ({}) : {}. Notifying `{}` ...", &self.name, &self.path, event, &prc_name);
event.event_from_file_trigger_controller(self.code_name.clone(), &triggers)
},
};
let _ = channel.send(msg).await;
}
}
}
#[async_trait]
impl ProcessUnit for FilesController {
async fn process(&mut self) {
// polling file check
// 1) existing check
// dbg!(&self);
if let Ok(_) = check_file(&self.name, &self.path).await {
if let FileState::NotFound = self.state {
info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name);
self.state = FileState::Ok;
// reseting negative outcome in prc
self.trigger_on(None).await;
}
match &mut self.watcher {
Some(notify) => {
let mut buffer = [0; 1024];
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
// notif_events.into_iter().for_each(|mask| {dbg!(&mask.mask);});
// todo!();
if let (recreate_watcher, true) = (
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
) {
warn!("File {} ({}) was changed", self.name, &self.path);
if recreate_watcher {
self.watcher = match create_watcher(&self.name, &self.path) {
Ok(notifier) => Some(notifier),
Err(er) => {
error!("Failed to recreate watcher for {} ({}) due to {}",
self.name,
&self.path,
er
);
None
},
}
}
self.trigger_on(Some(FileTriggerType::OnChange)).await;
return;
}
}
"restart" => {
let _ = tx.send(8).await;
},
None => { /* DEAD END */},
}
} else {
if let FileState::Ok = self.state {
warn!("File {} ({}) was not found in determined scope", self.name, &self.path);
self.state = FileState::NotFound;
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
}
return;
}
self.trigger_on(None).await;
// 2) change check
}
}
}
/// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events
///
/// *input* : `&str`, `&str`
///
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
///
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
///
/// *managing* : current file's name: &str, path in local storage to current file: &str
///
/// *depends on* : -
///
pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
Ok(inotify)
}
/// # Fn `create_watcher`
/// ## for managing processes by checking dep files' states
///
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
///
/// *initiator* : fn `utils::running_handler`
///
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *depends on* : Files
///
pub async fn file_handler(
name: &str,
files: &[Files],
tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) -> anyhow::Result<()> {
for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() {
if !is_active(name).await || is_frozen(name).await {
return Err(anyhow::Error::msg("Process is frozen or stopped"));
}
match file.triggers.on_delete.as_str() {
"stay" => {
tx.send(9).await.unwrap();
continue;
}
"stop" => {
if is_active(name).await {
tx.send(1).await.unwrap();
}
return Err(anyhow::Error::msg("Process was stopped"));
}
"hold" => {
if is_active(name).await {
tx.send(2).await.unwrap();
return Err(anyhow::Error::msg("Process was frozen"));
}
}
_ => {
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap();
return Err(anyhow::Error::msg("Impermissible character or word in file trigger"));
}
}
} else if is_active(name).await && !is_frozen(name).await {
let watchers = watchers.clone();
// println!("mutex: {:?}", watchers);
let mut buffer = [0; 128];
let mut mutex_guard = watchers.lock().await;
if let Some(notify) = mutex_guard.get_mut(i) {
let events = notify.read_events(&mut buffer);
// println!("{:?}", events);
if events.is_ok() {
let events: Vec<EventMask> = events
.unwrap()
.map(|mask| mask.mask)
.filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
})
.collect();
for event in events {
if let EventMask::DELETE_SELF = event {
// ! warning (DELETE_SELF event) !
// println!("! warning (DELETE_SELF event) !");
// * watcher recreation after dealing with file recreation mechanism in text editors
let mutex = notify.borrow_mut();
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
if let Ok(watcher) = create_watcher(&file.filename, &file.src) {
*mutex = watcher;
}
}
"stay" => {
let _ = tx.send(9).await;
}
_ => {
let _ = tx.send(101).await;
match file.triggers.on_change.as_str() {
"stop" => {
let _ = tx.send(7).await;
}
"restart" => {
let _ = tx.send(8).await;
}
"stay" => {
let _ = tx.send(9).await;
}
_ => {
let _ = tx.send(101).await;
}
}
}
}
}
}
}
tokio::task::yield_now().await;
Ok(())
}
tokio::task::yield_now().await;
Ok(())
}
/// # Fn `check_file`
/// ## for checking existance of current file
///
/// *input* : `&str`, `&str`
///
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
///
/// *initiator* : fn `file_handler`
///
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
///
/// *depends on* : network activity
///
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string());
tokio::task::spawn_blocking(move || {
let file_concat = format!("{}{}", arc_path, arc_name);
let path = Path::new(&file_concat);
if path.exists() {
Ok(())
} else {
Err(CustomError::Fatal)
/// # Fn `check_file`
/// ## for checking existance of current file
///
/// *input* : `&str`, `&str`
///
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
///
/// *initiator* : fn `file_handler`
///
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
///
/// *depends on* : network activity
///
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string());
tokio::task::spawn_blocking(move || {
let file_concat = format!("{}{}", arc_path, arc_name);
let path = Path::new(&file_concat);
if path.exists() {
Ok(())
} else {
Err(CustomError::Fatal)
}
})
.await
.unwrap_or_else(|_| {
panic!("Corrupted while file check process");
})
}
#[cfg(test)]
mod files_unittests {
use super::*;
#[tokio::test]
async fn try_to_create_watcher() {
let res = create_watcher("dep-file", "./tests/examples/");
assert!(res.is_ok());
}
#[tokio::test]
async fn try_to_create_invalid_watcher() {
let res = create_watcher("invalid-file", "/path/to/the/no/dir");
assert!(res.is_err());
}
#[tokio::test]
async fn check_existing_file() {
let res = check_file("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
}
#[tokio::test]
async fn check_non_existing_file() {
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
}
})
.await
.unwrap_or_else(|_| {
panic!("Corrupted while file check process");
})
}
#[cfg(test)]
mod files_unittests {
use super::*;
#[tokio::test]
async fn try_to_create_watcher() {
let res = create_watcher("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
}
#[tokio::test]
async fn try_to_create_invalid_watcher() {
let res = create_watcher("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
}
#[tokio::test]
async fn check_existing_file() {
let res = check_file("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
}
#[tokio::test]
async fn check_non_existing_file() {
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
}
}

View File

@ -1,8 +1,132 @@
use crate::options::structs::CustomError;
use log::{error, warn};
use std::process::{Command, Output};
use std::sync::Arc;
use tokio::time::Duration;
use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit};
use std::collections::HashSet;
use tokio::sync::mpsc::Receiver as MpscReciever;
use async_trait::async_trait;
pub mod v2 {
use log::info;
use crate::options::structs::DependencyType;
use std::path::Path;
use super::*;
#[derive(Debug)]
pub struct ProcessesController {
name: Arc<str>,
bin: String,
// obj: Arc<TrackingProcess>,
state: ProcessState,
event_reader: MpscReciever<Events>,
negative_events: HashSet<Arc<str>>,
}
impl PartialEq for ProcessesController {
fn eq(&self, other: &Self) -> bool {
self.bin == other.bin
}
}
impl ProcessesController {
pub fn new(name: &str, event_reader: MpscReciever<Events>) -> ProcessesController {
ProcessesController {
name : Arc::from(name),
bin : String::new(),
state : ProcessState::Stopped,
event_reader,
negative_events : HashSet::new(),
}
}
pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController {
self.bin = bin.as_ref().to_string_lossy().into_owned();
self
}
async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) {
match trigger {
"stay" => {
info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name);
},
"stop" => {
if is_active(&self.name).await {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
terminate_process(&self.name).await;
self.state = ProcessState::Stopped;
}
},
"hold" => {
if !is_frozen(&self.name).await {
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
freeze_process(&self.name).await;
self.state = ProcessState::Holding;
}
},
"restart" => {
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
let _ = restart_process(&self.name, &self.bin).await;
},
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
}
tokio::time::sleep(Duration::from_micros(100)).await;
}
}
#[async_trait]
impl ProcessUnit for ProcessesController {
async fn process(&mut self) {
if self.negative_events.len() == 0 {
match self.state {
ProcessState::Holding => {
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
if let Err(er) = unfreeze_process(&self.name).await {
error!("Cannot unfreeze process {} : {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
},
ProcessState::Stopped => {
info!("No negative dependecies events on {} process. Starting ...", self.name);
if let Err(er) = start_process(&self.name, &self.bin).await {
error!("Cannot start process {} : {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
},
_ => {},
}
}
while let Ok(event) = self.event_reader.try_recv() {
match event {
Events::Positive(target) => {
if self.negative_events.contains(&target) {
self.negative_events.remove(&target);
}
},
Events::Negative(event) => {
match event {
NegativeOutcomes::FileWasChanged(target, dep_type, trigger) |
NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) |
NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
if !self.negative_events.contains(&target) {
self.negative_events.insert(target.clone());
self.trigger_on(
&target,
&trigger,
dep_type
).await;
}
},
}
},
}
}
}
}
}
/// # Fn `get_pid`
/// ## for initializing process of unstoppable grubbing metrics.
@ -162,14 +286,11 @@ pub async fn freeze_process(name: &str) {
///
/// *depends on* : -
///
pub async fn unfreeze_process(name: &str) {
pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> {
let _ = Command::new("pkill")
.args(["-CONT", name])
.output()
.unwrap_or_else(|_| {
error!("Failed to unfreeze process");
std::process::exit(101);
});
.output()?;
Ok(())
}
/// # Fn `restart_process`
@ -185,7 +306,7 @@ pub async fn unfreeze_process(name: &str) {
///
/// *depends on* : fn `start_process`, fn `terminate_process`
///
pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> {
pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> {
terminate_process(name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
start_process(name, path).await
@ -204,7 +325,7 @@ pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError>
///
/// *depends on* : -
///
pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> {
// let runsh = format!("{} {}", "exec", path);
let mut command = Command::new(path);
// command.arg(path);
@ -215,8 +336,7 @@ pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
Ok(())
}
Err(er) => {
println!("{:?}", er);
Err(CustomError::Fatal)
Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er)))
}
}
}

View File

@ -1,10 +1,193 @@
use crate::options::structs::{CustomError, Services};
use super::prcs::{is_active, is_frozen};
use crate::options::structs::CustomError;
use log::{error, warn};
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
use tokio::time::Duration;
use tokio::sync::mpsc::Sender as Sender;
use async_trait::async_trait;
pub mod v2 {
use log::info;
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
use super::*;
use std::collections::{HashMap, BTreeMap, VecDeque};
type MpscSender = Arc<Sender<Events>>;
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
// type wrapper for service wait queue
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
#[derive(Debug)]
pub struct ServicesController {
// i.e. yandex.ru
#[allow(unused)]
name : String,
// i.e. yandex.ru:443
access_url : Arc<str>,
// "OK" or "Unavailable"
state: ServiceState,
// btree map with key as max wait time and it's key to hashmap
config: ConnectionQueue,
// Map of processes with their (trigger and mpsc sender)
event_registrator : EventHandlers,
}
impl PartialEq for ServicesController {
fn eq(&self, other: &Self) -> bool {
self.access_url == other.access_url
}
}
impl ServicesController {
pub fn new() -> ServicesController {
ServicesController {
name : String::new(),
access_url : Arc::from(String::new()),
state : ServiceState::Unavailable,
config: ConnectionQueue::new(),
event_registrator : EventHandlers::new(),
}
}
pub fn with_access_name(
mut self,
hostname: &str,
access_url: &str,
) -> ServicesController {
self.name = hostname.to_string();
self.access_url = Arc::from(access_url);
self
}
pub fn with_params(
mut self,
conn_queue: ConnectionQueue,
event_reg: EventHandlers,
) -> ServicesController {
self.config = conn_queue;
self.event_registrator = event_reg;
self
}
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
}
pub fn add_process(
&mut self,
proc_name: &str,
trigger: Triggers,
sender: MpscSender,
) {
let proc_name: Arc<str> = Arc::from(proc_name);
// queue add
if let Triggers::Service { wait, .. } = trigger {
self.config.entry(wait)
.and_modify(|el| el.push_back(proc_name.clone()))
.or_insert({
let mut temp = VecDeque::new();
temp.push_back(proc_name.clone());
temp
});
}
// event add
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
}
async fn check_state(&self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?;
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
}
Ok(())
}
async fn trigger_on(&mut self) {
match self.state {
ServiceState::Ok => {
let _ = self.event_registrator
.iter()
.map(|(_, (_, el))| async {
let _ = el.send(Events::Positive(self.access_url.clone())).await;
});
},
ServiceState::Unavailable => {
// looped check and notifying
self.looped_check().await;
},
}
}
async fn looped_check(self: &mut Self) {
let longest = self.config.last_entry().unwrap();
let longest = longest.key();
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
let timer = tokio::time::Instant::now();
let mut attempt: u32 = 1;
let access_url = Arc::new(self.access_url.clone());
// let event_registrator = &mut self.event_registrator;
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
// let access_url = access_url.clone();
loop {
interapter.tick().await;
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt);
attempt += 1;
let state_check_result = self.check_state().await;
if state_check_result.is_ok() {
info!("Connection to {} is `OK` now", &access_url);
self.state = ServiceState::Ok;
break;
} else {
let now = timer.elapsed();
let iterator = self.config.iter()
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
.flat_map(|(_, a)| a.iter().cloned())
.collect::<VecDeque<Arc<str>>>();
for name in iterator {
let proc_name = name.to_string();
info!("Trying to notify process `{}` ...", &proc_name);
let sender_opt = self.event_registrator.get(&name)
.map(|(trigger, sender)|
(trigger.to_service_negative_event(name.clone()), sender)
);
if let Some((tr, tx)) = sender_opt {
let _ = tx.send(tr.unwrap()).await;
} else {
error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url)
}
}
}
}
}).await {
info!("Timeout of establishing connection to {}. ", &access_url);
}
}
}
#[async_trait]
impl ProcessUnit for ServicesController {
async fn process(&mut self) {
// check_service(hostname, port)
let current_state = self.check_state().await;
match (&self.state, current_state) {
(ServiceState::Unavailable, Ok(_)) => {
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
self.state = ServiceState::Ok;
self.trigger_on().await;
},
(ServiceState::Ok, Err(_)) => {
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
self.state = ServiceState::Unavailable;
self.trigger_on().await;
},
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url),
_ => { /* DEAD END WITH NO INTEREST */ },
}
}
}
}
/// # Fn `service_handler`
/// ## function to realize mechanism of current process' dep services monitoring
@ -19,53 +202,53 @@ use tokio::time::{Duration, Instant};
///
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
///
pub async fn service_handler(
name: &str,
services: &Vec<Services>,
tx: Arc<mpsc::Sender<u8>>,
) -> Result<(), CustomError> {
// println!("service daemon on {}", name);
for serv in services {
if check_service(&serv.hostname, &serv.port).await.is_err() {
if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal);
}
error!(
"Service {}:{} is unreachable for process {}",
&serv.hostname, &serv.port, &name
);
match serv.triggers.on_lost.as_str() {
"stay" => {
tx.send(4).await.unwrap();
continue;
}
"stop" => {
if looped_service_connecting(name, serv).await.is_err() {
tx.send(5).await.unwrap();
tokio::task::yield_now().await;
return Err(CustomError::Fatal);
}
}
"hold" => {
// if is_frozen(name).await {
// return Err(CustomError::Fatal);
// }
if looped_service_connecting(name, serv).await.is_err() {
tx.send(6).await.unwrap();
tokio::task::yield_now().await;
return Err(CustomError::Fatal);
}
}
_ => {
tx.send(101).await.unwrap();
return Err(CustomError::Fatal);
}
}
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
// pub async fn service_handler(
// name: &str,
// services: &Vec<Services>,
// tx: Arc<mpsc::Sender<u8>>,
// ) -> Result<(), CustomError> {
// // println!("service daemon on {}", name);
// for serv in services {
// if check_service(&serv.hostname, &serv.port).await.is_err() {
// if !is_active(name).await || is_frozen(name).await {
// return Err(CustomError::Fatal);
// }
// error!(
// "Service {}:{} is unreachable for process {}",
// &serv.hostname, &serv.port, &name
// );
// match serv.triggers.on_lost.as_str() {
// "stay" => {
// tx.send(4).await.unwrap();
// continue;
// }
// "stop" => {
// if looped_service_connecting(name, serv).await.is_err() {
// tx.send(5).await.unwrap();
// tokio::task::yield_now().await;
// return Err(CustomError::Fatal);
// }
// }
// "hold" => {
// // if is_frozen(name).await {
// // return Err(CustomError::Fatal);
// // }
// if looped_service_connecting(name, serv).await.is_err() {
// tx.send(6).await.unwrap();
// tokio::task::yield_now().await;
// return Err(CustomError::Fatal);
// }
// }
// _ => {
// tx.send(101).await.unwrap();
// return Err(CustomError::Fatal);
// }
// }
// }
// }
// tokio::time::sleep(Duration::from_millis(100)).await;
// Ok(())
// }
/// # Fn `looped_service_connecting`
/// ## for service's state check in loop (with delay and restriction of attempts)
@ -80,54 +263,54 @@ pub async fn service_handler(
///
/// *depends on* : fn `check_service`
///
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
if serv.triggers.wait == 0 {
loop {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!(
"Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port
);
match check_service(&serv.hostname, &serv.port).await {
Ok(_) => {
log::info!(
"Successfully connected to {} from {} process!",
&serv.hostname,
&name
);
break;
}
Err(_) => {
tokio::task::yield_now().await;
}
}
}
Ok(())
} else {
let start = Instant::now();
while start.elapsed().as_secs() < serv.triggers.wait.into() {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!(
"Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port
);
match check_service(&serv.hostname, &serv.port).await {
Ok(_) => {
log::info!(
"Successfully connected to {} from {} process!",
&serv.hostname,
&name
);
return Ok(());
}
Err(_) => {
tokio::task::yield_now().await;
}
}
}
Err(CustomError::Fatal)
}
}
// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
// if serv.triggers.wait == 0 {
// loop {
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
// warn!(
// "Attempting to connect from {} process to {}:{}",
// &name, &serv.hostname, &serv.port
// );
// match check_service(&serv.hostname, &serv.port).await {
// Ok(_) => {
// log::info!(
// "Successfully connected to {} from {} process!",
// &serv.hostname,
// &name
// );
// break;
// }
// Err(_) => {
// tokio::task::yield_now().await;
// }
// }
// }
// Ok(())
// } else {
// let start = Instant::now();
// while start.elapsed().as_secs() < serv.triggers.wait.into() {
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
// warn!(
// "Attempting to connect from {} process to {}:{}",
// &name, &serv.hostname, &serv.port
// );
// match check_service(&serv.hostname, &serv.port).await {
// Ok(_) => {
// log::info!(
// "Successfully connected to {} from {} process!",
// &serv.hostname,
// &name
// );
// return Ok(());
// }
// Err(_) => {
// tokio::task::yield_now().await;
// }
// }
// }
// Err(CustomError::Fatal)
// }
// }
/// # Fn `check_service`
/// ## for check current service's availiability