Compare commits

..

1 Commits

Author SHA1 Message Date
YurijO f6e440bc62 Merge pull request 'rc' (#37) from rc into master
Reviewed-on: http://192.168.2.61/VladislavD/runner-rs/pulls/37
2025-01-23 12:52:15 +03:00
18 changed files with 552 additions and 1724 deletions

1
.gitignore vendored
View File

@ -4,4 +4,3 @@
Cargo.lock Cargo.lock
hagent_test.sock hagent_test.sock
release release
*.sock

View File

@ -1,6 +1,6 @@
[package] [package]
name = "noxis-cli" name = "noxis-cli"
version = "0.2.7" version = "0.2.4"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -2,17 +2,11 @@ use clap::{Parser, Subcommand};
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
pub struct Cli { pub struct Cli {
#[arg(
short,
default_value="noxis-rs.sock",
help="explicit specify of NOXIS Socket file"
)]
pub socket : String,
#[command( #[command(
subcommand, subcommand,
help = "to manage Noxis work", help = "to manage Noxis work",
)] )]
pub command : Commands, command : Commands,
} }
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)] #[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
@ -56,13 +50,13 @@ pub struct StartAction {
num_args = 1.., num_args = 1..,
value_delimiter = ' ' value_delimiter = ' '
)] )]
pub flags : Vec<String>, flags : Vec<String>,
} }
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
pub struct ConfigCommand { pub struct ConfigCommand {
#[command(subcommand)] #[command(subcommand)]
pub action : ConfigAction, action : ConfigAction,
} }
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)] #[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
@ -89,12 +83,12 @@ pub struct LocalConfig {
action, action,
help = "to read following input as JSON", help = "to read following input as JSON",
)] )]
pub is_json : bool, is_json : bool,
// value // value
#[arg( #[arg(
help = "path to config file or config String (with --json flag)", help = "path to config file or config String (with --json flag)",
)] )]
pub config : String, config : String,
} }
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
@ -102,16 +96,16 @@ pub struct ProcessCommand {
#[arg( #[arg(
help = "name of needed process", help = "name of needed process",
)] )]
pub process : String, process : String,
#[command( #[command(
subcommand, subcommand,
help = "To get current process's status", help = "To get current process's status",
)] )]
pub action : ProcessAction, action : ProcessAction,
} }
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)] #[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
pub enum ProcessAction { enum ProcessAction {
#[command( #[command(
about = "To get info about current process status", about = "To get info about current process status",
)] )]

View File

@ -1,15 +1,14 @@
use thiserror::Error; use thiserror::Error;
use super::cli_net::NOXIS_RS_CREDS;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum NoxisCliError { pub enum NoxisCliError {
#[error("Can't find socket `{0}`. Error : {1}")] #[error("Can't send any data to {:?}. Noxis-rs daemon is disabled or can't be accessed", NOXIS_RS_CREDS)]
NoxisDaemonMissing(String, String), NoxisDaemonMissing,
#[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")] #[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's web-functionality")]
PortIsNotWritable, PortIsNotWritable,
#[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")] #[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")]
CliPromptCanNotBeSent, CliPromptCanNotBeSent,
#[error("Can't parse CLI struct and send as byte stream")] #[error("Can't parse CLI struct and send as byte stream")]
ToStringCliParsingParsing, ToStringCliParsingParsing,
#[error("Can't read Noxis response due to {0}")]
CliResponseReadError(String)
} }

View File

@ -1,30 +1,32 @@
use tokio::net::UnixStream; use tokio::net::TcpStream;
use tokio::io::{AsyncWriteExt, AsyncReadExt}; use tokio::io::AsyncWriteExt;
use tokio::time::{Duration, sleep}; use tokio::time::{Duration, sleep};
use anyhow::Result; use anyhow::Result;
use super::Cli; use super::Cli;
use super::cli_error::NoxisCliError; use super::cli_error::NoxisCliError;
async fn create_us_stream(cli: &Cli) -> Result<UnixStream> { pub const NOXIS_RS_CREDS: &str = "127.0.0.1:7753";
Ok(UnixStream::connect(&cli.socket).await.map_err(|er| NoxisCliError::NoxisDaemonMissing((&cli.socket).to_string(), er.to_string()))?)
pub async fn create_tcp_stream() -> Result<TcpStream> {
Ok(TcpStream::connect(NOXIS_RS_CREDS).await.map_err(|_| NoxisCliError::NoxisDaemonMissing)?)
} }
pub async fn try_send(cli: Cli) -> Result<()> { pub async fn try_send(stream: Result<TcpStream>, params: Cli) -> Result<()> {
// let stream = create_us_stream(&cli).await; use serde_json::to_string;
let mut stream = create_us_stream(&cli).await?; let mut stream = stream.map_err(|_| NoxisCliError::NoxisDaemonMissing)?;
loop {
if stream.writable().await.is_err() {
sleep(Duration::from_millis(100)).await;
continue;
}
// let msg: Cli = from_str(&format!("{:?}", params))?;
let msg= to_string(&params).map_err(|_| NoxisCliError::ToStringCliParsingParsing)?;
// let msg = r"HTTP/1.1 POST\r\nContent-Length: 14\r\nContent-Type: text/plain\r\n\r\nHello, World!@";
let msg = serde_json::to_vec(&cli) stream.write_all(msg.as_bytes()).await.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
.map_err(|_| NoxisCliError::ToStringCliParsingParsing)?; // ...
break;
stream.write_all(&msg) }
.await
.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
let mut response = [0; 1024];
stream.read(&mut response)
.await
.map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?;
println!("Received response: {}", String::from_utf8_lossy(&response));
Ok(()) Ok(())
} }

View File

@ -4,12 +4,12 @@ mod cli_error;
use clap::Parser; use clap::Parser;
use cli::Cli; use cli::Cli;
use cli_net::try_send; use cli_net::{create_tcp_stream, try_send};
use anyhow::Result; use anyhow::Result;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()>{ async fn main() -> Result<()>{
let cli = Cli::parse(); let cli = Cli::parse();
try_send(cli).await?; try_send(create_tcp_stream().await, cli).await?;
Ok(()) Ok(())
} }

View File

@ -1,6 +1,6 @@
[package] [package]
name = "noxis-rs" name = "noxis-rs"
version = "0.11.26" version = "0.11.10"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
@ -8,15 +8,13 @@ anyhow = "1.0.93"
chrono = "0.4.38" chrono = "0.4.38"
clap = { version = "4.5.21", features = ["derive"] } clap = { version = "4.5.21", features = ["derive"] }
env_logger = "0.11.3" env_logger = "0.11.3"
inotify = "0.11.0" inotify = "0.10.2"
log = "0.4.22" log = "0.4.22"
pcap = "2.2.0" pcap = "2.2.0"
redis = "0.29.2" redis = "0.25.4"
serde = { version = "1.0.203", features = ["derive"] } serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.118" serde_json = "1.0.118"
sysinfo = "0.32.0" sysinfo = "0.32.0"
tokio = { version = "1.38.0", features = ["full", "time"] } tokio = { version = "1.38.0", features = ["full", "time"] }
noxis-cli = { path = "../noxis-cli" } noxis-cli = { path = "../noxis-cli" }
dotenv = "0.15.0" dotenv = "0.15.0"
futures = "0.3.31"
async-trait = "0.1.88"

View File

@ -1,6 +1,6 @@
{ {
"dateOfCreation": "1721381809112", "dateOfCreation": "1721381809104",
"configServer": "192.168.2.37", "configServer": "localhost",
"processes": [ "processes": [
{ {
"name": "temp-process", "name": "temp-process",
@ -12,7 +12,7 @@
"src": "./tests/examples/", "src": "./tests/examples/",
"triggers": { "triggers": {
"onDelete": "stop", "onDelete": "stop",
"onChange": "restart" "onChange": "stay"
} }
} }
], ],
@ -22,7 +22,8 @@
"port": 443, "port": 443,
"triggers": { "triggers": {
"wait": 10, "wait": 10,
"onLost": "restart" "delay": 2,
"onLost": "hold"
} }
} }
] ]

View File

@ -1,6 +1,7 @@
mod options; mod options;
mod utils; mod utils;
use anyhow::Error;
use clap::Parser; use clap::Parser;
use log::{error, info}; use log::{error, info};
use options::config::*; use options::config::*;
@ -13,140 +14,84 @@ use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use utils::*; use utils::*;
use options::preboot::PrebootParams; use options::preboot::PrebootParams;
use tokio::sync::{broadcast, oneshot};
use options::config::v2::init_config_mechanism;
use utils::v2::init_monitoring;
#[tokio::main(flavor = "multi_thread", worker_threads = 4)] #[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()>{ async fn main() -> anyhow::Result<()>{
let preboot = Arc::new(PrebootParams::parse().validate()?); let preboot = Arc::new(PrebootParams::parse().validate()?);
let _ = setup_logger(); let _ = setup_logger();
info!("Noxis is configurating..."); info!("Runner is configurating...");
//
let (tx_brd, mut rx_brd) = broadcast::channel::<Processes>(1); // setting up redis connection \
// cli <-> config // then conf checks to choose the most actual \
let (tx_oneshot, rx_oneshot) = oneshot::channel::<Processes>(); let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
error!("No actual configuration for runner. Stopping...");
std::process::exit(1);
});
info!(
"Current runner configuration: {}",
&processes.date_of_creation
);
info!("Runner is ready. Initializing...");
if processes.processes.is_empty() {
error!("Processes list is null, runner-rs initialization is stopped");
return Err(Error::msg("Empty processes segment in config"));
}
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![]; let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// is in need to send to the signals handler thread
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
// initilaizing task for config manipulations for proc in processes.processes.iter() {
let config_module = tokio::spawn(async move { info!(
let _ = init_config_mechanism( "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
rx_oneshot, proc.name,
tx_brd, proc.path,
preboot.clone() proc.dependencies.files.len(),
).await; proc.dependencies.services.len()
);
// creating msg channel
// can or should be executed in new thread
let (tx, mut rx) = mpsc::channel::<u8>(1);
let proc = Arc::new(proc.clone());
let tx = Arc::new(tx.clone());
senders.push(Arc::clone(&tx.clone()));
let event = tokio::spawn(async move {
run_daemons(proc.clone(), tx.clone(), &mut rx).await;
}); });
handler.push(config_module); handler.push(event);
// initilaizing task for cli manipulation
let cli_module = tokio::spawn(async move {
if let Err(er) = init_cli_pipeline().await {
error!("CLI pipeline failed due to {}", er)
} }
});
handler.push(cli_module);
// initilaizing task for deinitializing `Noxis` // destructor addition
let ctrlc = tokio::spawn(async move { handler.push(tokio::spawn(async move {
if let Err(er) = set_valid_destructor(vec![].into()).await { if set_valid_destructor(Arc::new(senders)).await.is_err() {
error!("Destructor mod failed due to {}", er); error!("Linux signals handler creation failed. Terminating main thread...");
return;
} }
tokio::time::sleep(Duration::from_millis(200)).await;
info!("End of job. Terminating main thread...");
std::process::exit(0); std::process::exit(0);
}); }));
handler.push(ctrlc);
let monitoring = tokio::spawn(async move { // remote config update subscription
let config = { handler.push(tokio::spawn(async move {
let mut tick = tokio::time::interval(Duration::from_millis(500)); let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
loop { }));
tick.tick().await;
break match rx_brd.try_recv() { // cli pipeline
Ok(conf) => conf, handler.push(tokio::spawn(async move {
Err(_) => continue, let _ = init_cli_pipeline().await;
} }));
}
};
if let Err(er) = init_monitoring(config).await {
error!("Monitoring mod failed due to {}", er);
}
});
handler.push(monitoring);
for i in handler { for i in handler {
let _ = i.await; let _ = i.await;
} }
// setting up redis connection \
// then conf checks to choose the most actual \
// let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
// error!("No actual configuration for runner. Stopping...");
// std::process::exit(1);
// });
//
// info!(
// "Current runner configuration: {}",
// &processes.date_of_creation
// );
// info!("Runner is ready. Initializing...");
//
// if processes.processes.is_empty() {
// error!("Processes list is null, runner-rs initialization is stopped");
// return Err(Error::msg("Empty processes segment in config"));
// }
// let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// // is in need to send to the signals handler thread
// let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
//
// for proc in processes.processes.iter() {
// info!(
// "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
// proc.name,
// proc.path,
// proc.dependencies.files.len(),
// proc.dependencies.services.len()
// );
//
// // creating msg channel
// // can or should be executed in new thread
// let (tx, mut rx) = mpsc::channel::<u8>(1);
// let proc = Arc::new(proc.clone());
// let tx = Arc::new(tx.clone());
//
// senders.push(Arc::clone(&tx.clone()));
//
// let event = tokio::spawn(async move {
// run_daemons(proc.clone(), tx.clone(), &mut rx).await;
// });
// handler.push(event);
// }
//
// // destructor addition
// handler.push(tokio::spawn(async move {
// if set_valid_destructor(Arc::new(senders)).await.is_err() {
// error!("Linux signals handler creation failed. Terminating main thread...");
// return;
// }
//
// tokio::time::sleep(Duration::from_millis(200)).await;
// info!("End of job. Terminating main thread...");
// std::process::exit(0);
// }));
//
// // remote config update subscription
// handler.push(tokio::spawn(async move {
// let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
// }));
//
// // cli pipeline
// handler.push(tokio::spawn(async move {
// let _ = init_cli_pipeline().await;
// }));
//
// for i in handler {
// let _ = i.await;
// }
Ok(()) Ok(())
} }

View File

@ -1,9 +1,12 @@
use log::{error, info}; use log::{error, info, warn};
use tokio::net::{ UnixStream, UnixListener }; use tokio::net::{TcpListener, TcpStream};
use anyhow::{Result as DynResult, Error};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use std::fs; use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}};
use tokio::io::{ AsyncWriteExt, AsyncReadExt}; // use std::io::BufReader;
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
use noxis_cli::Cli; use noxis_cli::Cli;
use serde_json::from_str;
/// # Fn `init_cli_pipeline` /// # Fn `init_cli_pipeline`
/// ## for catching all input requests from CLI /// ## for catching all input requests from CLI
@ -18,32 +21,49 @@ use noxis_cli::Cli;
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn init_cli_pipeline() -> anyhow::Result<()> { pub async fn init_cli_pipeline() -> DynResult<()> {
let socket_path = "noxis.sock"; match init_listener().await {
let _ = fs::remove_file(socket_path); Some(list) => {
match UnixListener::bind(socket_path) {
Ok(list) => {
// TODO: remove `unwrap`s
info!("Listening on {}", socket_path);
loop { loop {
match list.accept().await { if let Ok((socket, addr)) = list.accept().await {
Ok((socket, _)) => { // isolation
// tokio::spawn(); if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() {
process_connection(socket).await; warn!("Declined attempt to connect TCP-socket from {}", addr);
}, continue;
Err(er) => {
error!("Cannot poll connection to CLI due to {}", er);
sleep(Duration::from_millis(300)).await;
},
} }
process_connection(socket).await;
}
sleep(Duration::from_millis(500)).await;
} }
// Ok(()) // Ok(())
}, },
Err(er) => { None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use"))
error!("Failed to open UnixListener for CLI"); }
Err(er.into()) }
/// # Fn `init_listener`
/// ## for creating TCP-listener for communicating with CLI
///
/// *input* : -
///
/// *output* : `Some<TcpListener>` if port 7753 was opened | None if not
///
/// *initiator* : fn `init_cli_pipeline`
///
/// *managing* : `TcpListener` object to handle requests
///
/// *depends on* : `tokio::net::TcpListener`
///
async fn init_listener() -> Option<TcpListener> {
match TcpListener::bind("127.0.0.1:7753").await {
Ok(listener) => {
info!("Runner is listening localhost:7753");
Some(listener)
}, },
Err(_) => {
error!("Cannot create TCP listener for CLI");
None
}
} }
} }
@ -60,29 +80,27 @@ pub async fn init_cli_pipeline() -> anyhow::Result<()> {
/// ///
/// *depends on* : `tokio::net::TcpStream` /// *depends on* : `tokio::net::TcpStream`
/// ///
async fn process_connection(mut stream: UnixStream) { async fn process_connection(mut stream: TcpStream) {
let mut buf = vec![0; 1024]; let buf_reader = BufReader::new(stream.borrow_mut());
match stream.read(&mut buf).await { let mut rqst = buf_reader.lines();
Ok(0) => {
info!("Client disconnected ");
while let Ok(Some(line)) = rqst.next_line().await {
if line.is_empty() {
break
}
match from_str::<Cli>(&line) {
Ok(req) => {
// TODO: func wrapper
dbg!(req);
}, },
Ok(n) => { Err(_) => {
buf.truncate(n); break
info!("CLI have sent {} bytes", n);
match serde_json::from_slice::<Cli>(&buf) {
Ok(cli) => {
info!("Received CLI request: {:?}", cli);
let response = "OK";
if let Err(e) = stream.write_all(response.as_bytes()).await {
error!("Failed to send response: {}", e);
}
}
Err(e) => {
error!("Failed to parse CLI request: {}", e);
}
}
}, },
Err(e) => error!("Failed to read from socket: {}", e),
} }
let _ = stream.shutdown().await; println!("{}", line);
}
let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!";
stream.write_all(response.as_bytes()).await.unwrap();
} }

View File

@ -9,393 +9,9 @@ use std::sync::Arc;
use std::{env, fs}; use std::{env, fs};
use super::preboot::PrebootParams; use super::preboot::PrebootParams;
use tokio::time::{Duration, sleep}; use tokio::time::{Duration, sleep};
// use redis::PubSub;
use tokio::sync::{
oneshot,
oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender },
broadcast::Sender as BroadcastSender, broadcast::Receiver as BroadcastReceiver };
use crate::utils::files::create_watcher;
use std::fs::File;
use inotify::EventMask;
// const CONFIG_PATH: &str = "settings.json"; // const CONFIG_PATH: &str = "settings.json";
pub mod v2 {
use std::path::PathBuf;
use crate::utils::get_container_id;
use super::*;
pub async fn init_config_mechanism(
// to handle cli config changes
cli_oneshot: OneShotReciever<Processes>,
// to share local config with PRCS, CLI_PIPELINE and CONFIG modules
brd_tx : BroadcastSender<Processes>,
// preboot params (args)
params : Arc<PrebootParams>
/*...*/
) {
// channel for pubsub to handle local config pulling
let local_config_brd_reciever = brd_tx.subscribe();
// channel between pub-sub mech and local config mech
let (tx_pb_lc, rx_pb_lc) = oneshot::channel::<bool>();
// channel between cli mech and local config mech
let (tx_cli_lc, rx_cli_lc) = oneshot::channel::<bool>();
// dbg!("before lc");
let params_clone = params.clone();
let for_lc_path = params.clone();
let lc_path = for_lc_path
.config
.to_str()
.unwrap_or("settings.json");
// future to init work with local config
let lc_future = tokio::spawn(
// let params = params.clone();
local_config_reciever(
params_clone,
rx_pb_lc,
rx_cli_lc,
Arc::new(brd_tx)
)
);
// dbg!("before pb");
// future to init work with pub sub mechanism
let pubsub_future = tokio::spawn(
pubsub_config_reciever(
tx_pb_lc,
params.clone(),
local_config_brd_reciever
)
);
// dbg!("before cli");
// future to catch new configs from cli pipeline
let cli_future = tokio::spawn(
from_cli_config_reciever(
cli_oneshot,
tx_cli_lc
)
);
// let _ = lc_future.await;
// dbg!("before select");
tokio::select! {
lc_result = lc_future => {
// dbg!("end of lc");
match lc_result {
Ok(res) => {
if res.is_ok() {
info!("Local config warding mechanism stopped, waiting for others ...");
sleep(Duration::from_millis(500)).await;
let _ = restart_main_thread();
}
else {
error!("Local config warding mechanism crushed, restarting ...");
let _ = restart_main_thread();
}
},
Err(_) => {
error!("Local config warding mechanism crushed, restarting ...");
let _ = restart_main_thread();
},
}
},
pb_result = pubsub_future => {
match pb_result {
Ok(res) => {
if res.is_ok() {
info!("New config was saved locally, restarting ...");
}
else {
error!("Pubsub mechanism crushed, restarting ...");
}
},
Err(_) => {
error!("Pubsub mechanism crushed, restarting ...");
},
}
let _ = restart_main_thread();
},
cli_config_option = cli_future => {
match cli_config_option {
Err(_) => error!("CLI pulling new config mechanism crushed, restarting ..."),
Ok(option_config) => {
match option_config {
None => error!("CLI pulling new config mechanism crushed, restarting ..."),
Some(config) => {
info!("New config was pulled from CLI, saving and restarting ...");
let _ = save_new_config(&config, lc_path);
},
}
},
}
let _ = restart_main_thread();
},
}
// dbg!("after select");
// TODO! futures + select! [OK]
// TODO! tests config
}
pub async fn get_redis_connection(params: &str) -> Option<Connection> {
for i in 1..=3 {
let redis_url = format!("redis://{}/", params);
info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i);
if let Ok(client) = Client::open(redis_url) {
if let Ok(conn) = client.get_connection() {
info!("Successfully opened Redis connection");
return Some(conn);
}
}
error!("Error with subscribing Redis stream on update. Retrying in 5 secs...");
sleep(Duration::from_secs(5)).await;
}
None
}
// loop checking redis pubsub
async fn pubsub_config_reciever(
// to stop checking local config
local_conf_tx : OneShotSender<bool>,
params : Arc<PrebootParams>,
tx_brd_local : BroadcastReceiver<Processes>,
) -> anyhow::Result<()>{
/*...*/
// dbg!("start of pb");
let mut tx_brd_local = tx_brd_local;
let local_config = if !tx_brd_local.is_empty() {
tx_brd_local.recv().await?
} else {
// Processes::default()
let mut tick = tokio::time::interval(Duration::from_millis(500));
loop {
tick.tick().await;
break match tx_brd_local.recv().await {
Ok(conf) => conf,
Err(_) => continue,
};
}
};
match get_redis_connection(&local_config.config_server).await {
Some(mut conn) => {
let mut pub_sub = conn.as_pubsub();
let channel_name = get_container_id().unwrap_or(String::from("default"));
let channel_name = channel_name.trim();
match pub_sub.subscribe(channel_name) {
Err(er) => {
error!("Cannot subscribe pubsub channel due to {}", &er);
return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
},
Ok(_) => {
info!("Successfully subscribed to {} pubsub channel", channel_name);
let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3)));
loop {
if let Ok(msg) = pub_sub.get_message() {
// dbg!("ok on get message");
let payload : Result<String, _> = msg.get_payload();
match payload {
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
Ok(payload) => {
if let Some(remote) = parse_extern_config(&payload) {
match config_comparing(&local_config, &remote) {
ConfigActuality::Local => {
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
},
ConfigActuality::Remote => {
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
// to stop watching local config file mechanism
let _ = local_conf_tx.send(true);
let config_path = params.config.to_str().unwrap_or("settings.json");
if save_new_config(&remote, &config_path).is_err() {
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
return Err(anyhow::Error::msg(
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
))
}
return Ok(());
},
}
}
else {
warn!("Invalid config was pulled from Redis channel")
}
},
}
}
// delay
tokio::task::yield_now().await;
}
},
}
},
None => {
sleep(Duration::from_secs(20)).await;
}
}
Ok(())
}
//
async fn local_config_reciever(
params : Arc<PrebootParams>,
pubsub_oneshot : OneShotReciever<bool>,
cli_oneshot : OneShotReciever<bool>,
brd_tx : Arc<BroadcastSender<Processes>>,
/*...*/
) -> anyhow::Result<()> {
/*...*/
// shadowing as mut
let mut pubsub_oneshot = pubsub_oneshot;
let mut cli_oneshot = cli_oneshot;
// fill with default empty config, mut to change later
let mut _current_config = Processes::default();
// PathBuf to &str to work with local config path as slice
let local_config_path = params
.config
.to_str()
.unwrap_or("settings.json");
match load_processes(local_config_path) {
// if local exists
Some(conf) => {
info!("Local config `{}` was found.", &conf.date_of_creation);
_current_config = conf;
if let Err(er) = brd_tx.send(_current_config.clone()) {
error!("Cannot share local config with broadcast due to {}", er);
}
},
// if local is not exist
None => {
warn!("Local config wasn't found. Waiting for new ...");
return Err(anyhow::Error::msg("No local config"));
// ...
},
}
// 100% local exists here
// create watcher on local config file
match create_watcher("", local_config_path) {
Ok(mut watcher) => {
loop {
let mut need_to_export_config = false;
// let mut need_to_recreate_watcher = false;
// return situations here
// 1) oneshot signal
// 2) if config was deleted -> recreate and fill with current config that is held here
// 3) if config was changed -> fill with current config that is held here
// catching signal from pubsub
// it's because pubsub mech pulled new valid and actual config and now it's time to ...
// ... overwrite local config file and restart main thread
if let Ok(_) = pubsub_oneshot.try_recv() {
sleep(Duration::from_secs(1)).await;
return Ok(());
}
// catching signal from cli
// it's because cli mech pulled new valid and actual config and now it's time to ...
// ... overwrite local config file and restart main thread (like in previous mechanism)
if let Ok(_) = cli_oneshot.try_recv() {
sleep(Duration::from_secs(1)).await;
return Ok(());
}
// ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ...
// ! ... FROM WATCHER"S BUFFER
// existing check
if !params.config.exists() {
warn!("Local config file was deleted or moved. Recreating new one with saved data ...");
need_to_export_config = true;
// need_to_recreate_watcher = true;
} else {
// changes check
let mut buffer = [0; 128];
let events = watcher.read_events(&mut buffer);
if events.is_ok() {
let events: Vec<EventMask> = events
.unwrap()
.map(|mask| mask.mask)
.filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
})
.collect();
if !events.is_empty() {
warn!("Local config file was overwritten. Discarding changes ...");
need_to_export_config = true;
// events
// .iter()
// .any(|event| *event == EventMask::DELETE_SELF)
// .then(|| need_to_recreate_watcher = true);
}
}
}
// exporting data
if need_to_export_config {
if let Err(er) = export_saved_config_data_locally(&params.config, &_current_config).await {
error!("Cannot save actual imported config due to {}", er);
} else {
// recreation watcher (draining activity buffer mechanism)
// if local config file was deleted and recreated
// if local config file was modified locally
match create_watcher("", local_config_path) {
Ok(new) => watcher = new,
Err(er) => error!("Cannot create new watcher due to {}", er),
}
}
}
sleep(Duration::from_millis(300)).await;
// tokio::task::yield_now().await;
}
},
Err(_) => {
error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path);
return Err(anyhow::Error::msg("Cannot create watcher on local config file"));
},
}
}
// [:IN-TEST]
async fn from_cli_config_reciever(
cli_oneshot: OneShotReciever<Processes>,
to_local_tx: OneShotSender<bool>
) -> Option<Processes> {
/* match awaits til channel*/
// dbg!("start of cli");
loop {
if !cli_oneshot.is_empty() {
match cli_oneshot.await {
Ok(config_from_cli) => {
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);
let _ = to_local_tx.send(true);
return Some(config_from_cli)
},
_ => return None,
}
}
sleep(Duration::from_millis(300)).await;
}
}
async fn export_saved_config_data_locally(
config_file_path: &PathBuf,
current_config: &Processes
) -> anyhow::Result<()> {
let mut file = File::create(config_file_path)?;
file.write_all(
serde_json::to_string_pretty(current_config)?.as_bytes()
)?;
Ok(())
// Ok(())
}
}
/// # Fn `load_processes` /// # Fn `load_processes`
/// ## for reading and parsing *local* storing config /// ## for reading and parsing *local* storing config
/// ///
@ -438,14 +54,14 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
error!("Invalid character in config file. Config path was set to default"); error!("Invalid character in config file. Config path was set to default");
"settings.json" "settings.json"
}); });
info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url); info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url);
match load_processes(config_path) { match load_processes(config_path) {
Some(local_conf) => { Some(local_conf) => {
info!( info!(
"Found local configuration, version - {}", "Found local configuration, version - {}",
&local_conf.date_of_creation &local_conf.date_of_creation
); );
if !params.no_sub { if !params.no_remote_config {
if let Some(remote_conf) = if let Some(remote_conf) =
// TODO : rework with pubsub mech // TODO : rework with pubsub mech
once_get_remote_configuration(&format!("redis://{}/", &params.remote_server_url)) once_get_remote_configuration(&format!("redis://{}/", &params.remote_server_url))
@ -469,7 +85,7 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
} }
None => { None => {
warn!("No local valid conf was found. Trying to pull remote one..."); warn!("No local valid conf was found. Trying to pull remote one...");
if !params.no_sub { if !params.no_remote_config {
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", &params.remote_server_url))); let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", &params.remote_server_url)));
if let Some(conf) = get_remote_conf_watcher(&mut conn).await { if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
@ -706,7 +322,7 @@ fn restart_main_thread() -> std::io::Result<()> {
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> { pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json"); let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
if params.no_sub { if params.no_sub || params.no_remote_config {
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) { if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) {
@ -781,9 +397,6 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<Pr
/// *depends on* : `Processes`, `ConfigActuality` /// *depends on* : `Processes`, `ConfigActuality`
/// ///
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
if local.is_default() {
return ConfigActuality::Remote;
}
let local_date: u64 = local.date_of_creation.parse().unwrap(); let local_date: u64 = local.date_of_creation.parse().unwrap();
let remote_date: u64 = remote.date_of_creation.parse().unwrap(); let remote_date: u64 = remote.date_of_creation.parse().unwrap();

View File

@ -13,7 +13,7 @@ enum EnvVars {
NoxisNoHagent, NoxisNoHagent,
NoxisNoLogs, NoxisNoLogs,
NoxisRefreshLogs, NoxisRefreshLogs,
// NoxisNoRemoteConfig, NoxisNoRemoteConfig,
NoxisNoConfigSub, NoxisNoConfigSub,
NoxisSocketPath, NoxisSocketPath,
NoxisLogTo, NoxisLogTo,
@ -29,7 +29,7 @@ impl std::fmt::Display for EnvVars {
EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"), EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"),
EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"), EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"),
EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"), EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"),
// EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"), EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"),
EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"), EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"),
EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"), EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"),
EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"), EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"),
@ -48,7 +48,7 @@ impl<'a> EnvVars {
EnvVars::NoxisNoHagent => "false", EnvVars::NoxisNoHagent => "false",
EnvVars::NoxisNoLogs => "false", EnvVars::NoxisNoLogs => "false",
EnvVars::NoxisRefreshLogs => "false", EnvVars::NoxisRefreshLogs => "false",
// EnvVars::NoxisNoRemoteConfig => "false", EnvVars::NoxisNoRemoteConfig => "false",
EnvVars::NoxisNoConfigSub => "false", EnvVars::NoxisNoConfigSub => "false",
EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock", EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock",
EnvVars::NoxisLogTo => "./", EnvVars::NoxisLogTo => "./",
@ -77,7 +77,7 @@ impl<'a> EnvVars {
Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string()); Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string());
Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string()); Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string());
Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string()); Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string());
// Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string()); Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string());
Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string()); Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string());
Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap()); Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap());
Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap()); Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap());
@ -147,6 +147,12 @@ impl std::fmt::Display for MetricsPrebootParams {
/// noxis-rs ... --refresh-logs ... /// noxis-rs ... --refresh-logs ...
/// ``` /// ```
/// ///
/// `--no-remote-config` - to disable work with Redis as config producer
/// ### usage :
/// ``` bash
/// noxis-rs ... --no-remote-config ...
/// ```
///
/// `--no-sub` - to disable Redis subscribtion mechanism /// `--no-sub` - to disable Redis subscribtion mechanism
/// ### usage : /// ### usage :
/// ``` bash /// ``` bash
@ -206,18 +212,17 @@ pub struct PrebootParams {
help="To clear logs directory" help="To clear logs directory"
)] )]
pub refresh_logs : bool, pub refresh_logs : bool,
// #[arg( #[arg(
// long = "no-remote-config", long = "no-remote-config",
// action, action,
// help="To disable work with remote config server", help="To disable work with remote config server",
// conflicts_with="no_sub")] conflicts_with="no_sub")]
// pub no_remote_config : bool, pub no_remote_config : bool,
#[arg( #[arg(
long = "no-sub", long = "no-sub",
action, action,
help="To disable Redis subscription mechanism", help="To disable subscription mechanism",
)] conflicts_with="no_remote_config")]
// conflicts_with="no_remote_config"
pub no_sub : bool, pub no_sub : bool,
// params (socket_path, log_to, remote_server_url, config) // params (socket_path, log_to, remote_server_url, config)
@ -238,7 +243,7 @@ pub struct PrebootParams {
#[arg( #[arg(
long = "remote-server-url", long = "remote-server-url",
default_value="localhost", default_value="localhost",
conflicts_with="no_sub", conflicts_with="no_remote_config",
help = "To set url of remote config server using in remote config pulling mechanism" help = "To set url of remote config server using in remote config pulling mechanism"
)] )]
pub remote_server_url : String, pub remote_server_url : String,
@ -283,17 +288,15 @@ impl PrebootParams {
// existing log dir // existing log dir
if !self.log_to.exists() && !self.no_logs { if !self.log_to.exists() && !self.no_logs {
eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default"); eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default");
self.refresh_logs = false;
self.log_to = PathBuf::from("./"); self.log_to = PathBuf::from("./");
// return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start")); // return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start"));
} }
// existing sock file // existing sock file
if !self.config.exists() { if !self.config.exists() {
eprintln!("Error: Invalid character in config file. Config path was set to default"); eprintln!("Error: Invalid character in config file. Config path was set to default");
// TODO : ??? wtf is going with 2 paths let config = PathBuf::from("/etc/settings.json");
let config = PathBuf::from("/etc/enode/noxis/settings.json"); if !config.exists() && self.no_remote_config {
if !config.exists() && self.no_sub { return Err(Error::msg("Noxis cannot run without config. Create local config or enable remote-config mechanism"));
return Err(Error::msg("Noxis cannot run without config. Create local config or enable pubsub mechanism"));
} }
self.config = PathBuf::from("settings.json"); self.config = PathBuf::from("settings.json");
// return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start")); // return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start"));
@ -350,20 +353,20 @@ mod preboot_unitests{
"runner-rs", "runner-rs",
"--no-sub", "--no-sub",
"--remote-server-url", "redis://127.0.0.1" "--remote-server-url", "redis://127.0.0.1"
]).is_ok())
}
#[test]
fn parsing_config_invalid_args_noremote_nosub() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--no-remote-config", "--no-sub"
]).is_err()) ]).is_err())
} }
// #[test]
// fn parsing_config_invalid_args_noremote_nosub() {
// assert!(PrebootParams::try_parse_from(vec![
// "runner-rs",
// "--no-remote-config", "--no-sub"
// ]).is_err())
// }
#[test] #[test]
fn parsing_config_invalid_args_noremote_remoteurl() { fn parsing_config_invalid_args_noremote_remoteurl() {
assert!(PrebootParams::try_parse_from(vec![ assert!(PrebootParams::try_parse_from(vec![
"runner-rs", "runner-rs",
"--no-sub", "--no-remote-config",
"--remote-server-url", "redis://127.0.0.1" "--remote-server-url", "redis://127.0.0.1"
]).is_err()) ]).is_err())
} }

View File

@ -22,7 +22,7 @@ type SendersVec = Arc<Vec<Arc<mpsc::Sender<u8>>>>;
/// ///
/// *depends on* : Sig, Signals /// *depends on* : Sig, Signals
/// ///
pub async fn set_valid_destructor(senders: SendersVec) -> anyhow::Result<()> { pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError> {
let (mut int, mut term, mut stop) = ( let (mut int, mut term, mut stop) = (
Sig::new(Signals::Sigint, senders.clone()), Sig::new(Signals::Sigint, senders.clone()),
Sig::new(Signals::Sigterm, senders.clone()), Sig::new(Signals::Sigterm, senders.clone()),

View File

@ -2,114 +2,7 @@
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use std::sync::Arc;
#[derive(Debug)]
pub enum DependencyType {
File,
Service,
}
#[derive(Debug)]
pub enum ServiceState {
Ok,
Unavailable
}
pub struct ServiceWaitConfig(u32);
impl Default for ServiceWaitConfig {
fn default() -> Self {
Self(5)
}
}
pub enum FileTriggerType {
OnChange,
OnDelete,
}
impl std::fmt::Display for FileTriggerType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self {
FileTriggerType::OnChange => write!(f, "File was changed"),
FileTriggerType::OnDelete => write!(f, "File was moved or deleted"),
}
}
}
impl<'a> FileTriggerType {
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger)),
}
}
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger.on_delete.clone())),
}
}
}
#[derive(Debug)]
pub enum Triggers {
File { on_change: Arc<str>, on_delete: Arc<str> },
Service {on_lost: Arc<str>, wait: u32},
}
impl Triggers {
pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers {
Triggers::File { on_change, on_delete }
}
pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers {
Triggers::Service{on_lost, wait: wait_time}
}
pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> {
if let Triggers::Service { on_lost, .. } = self {
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone())))
}
None
}
}
#[derive(Debug)]
pub struct FileTriggersForController{ pub on_change: Arc<str>, pub on_delete: Arc<str> }
pub struct ServiceTriggersForController(Arc<str>);
impl std::fmt::Display for DependencyType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self {
DependencyType::File => write!(f, "File"),
DependencyType::Service => write!(f, "Service"),
}
}
}
#[derive(Debug)]
pub enum ProcessState {
Pending,
Holding,
Stopped,
StoppedByCli,
}
#[derive(Debug)]
pub enum Events {
Positive(Arc<str>),
Negative(NegativeOutcomes)
}
#[derive(Debug)]
pub enum NegativeOutcomes {
FileWasChanged(Arc<str>, DependencyType, Arc<str>),
FileWasMovedOrDeleted(Arc<str>, DependencyType, Arc<str>),
ServiceIsUnreachable(Arc<str>, DependencyType, Arc<str>),
}
#[async_trait]
pub trait ProcessUnit {
async fn process(&mut self);
}
/// # an Error enum (next will be deleted and replaced) /// # an Error enum (next will be deleted and replaced)
pub enum CustomError { pub enum CustomError {
Fatal, Fatal,
@ -147,22 +40,6 @@ pub struct Processes {
pub processes: Vec<TrackingProcess>, pub processes: Vec<TrackingProcess>,
} }
impl Default for Processes {
fn default() -> Self {
Self {
date_of_creation : String::new(),
config_server : String::from("default"),
processes : Vec::new(),
}
}
}
impl Processes {
pub fn is_default(&self) -> bool {
self.date_of_creation.is_empty()
}
}
/// # Struct for the 2nd level in json conf file /// # Struct for the 2nd level in json conf file
/// ## for each process to contain info, such as name, path and dependencies /// ## for each process to contain info, such as name, path and dependencies
/// ///
@ -258,7 +135,7 @@ pub struct Files {
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Services { pub struct Services {
pub hostname: String, pub hostname: String,
pub port: Option<u32>, pub port: u32,
pub triggers: ServiceTriggers, pub triggers: ServiceTriggers,
} }
@ -282,6 +159,7 @@ pub struct Services {
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ServiceTriggers { pub struct ServiceTriggers {
pub wait: u32, pub wait: u32,
pub delay: u32,
#[serde(rename = "onLost")] #[serde(rename = "onLost")]
pub on_lost: String, pub on_lost: String,
} }

View File

@ -6,207 +6,25 @@ pub mod services;
// TODO : saving current flags state // TODO : saving current flags state
use crate::options::structs::{CustomError, TrackingProcess, Processes}; use crate::options::structs::CustomError;
// use files::create_watcher; use crate::options::structs::TrackingProcess;
// use files::file_handler; use files::create_watcher;
// use inotify::Inotify; use files::file_handler;
use log::{error, warn, info}; use inotify::Inotify;
use log::{error, warn};
use prcs::{ use prcs::{
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process, freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
unfreeze_process, unfreeze_process,
}; };
// use services::service_handler; use services::service_handler;
use std::process::Command; use std::process::Command;
use std::sync::Arc; use std::sync::Arc;
// use tokio::join; use tokio::join;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::Duration; use tokio::time::Duration;
// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender};
// controllers import
use prcs::v2::ProcessesController;
use files::v2::FilesController;
use services::v2::ServicesController;
use async_trait::async_trait;
const GET_ID_CMD: &str = "hostname"; const GET_ID_CMD: &str = "hostname";
pub mod v2 {
use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
use super::*;
#[derive(Debug)]
enum ControllerResult {
Process(Option<ProcessesController>),
File(Option<FilesController>),
Service(Option<ServicesController>),
}
#[derive(Debug)]
struct Supervisor {
prcs : LinkedList<ProcessesController>,
files : LinkedList<FilesController>,
services : LinkedList<ServicesController>,
}
impl Supervisor {
pub fn new() -> Supervisor {
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
}
pub async fn with_config(mut self, config: &Processes) -> Supervisor {
let _ = config.processes.iter()
.for_each(|prc| {
let (rx, tx) = mpsc::channel::<Events>(10);
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
if !self.prcs.contains(&temp) {
self.prcs.push_back(temp);
}
let rx = Arc::new(rx);
let proc_name: Arc<str> = Arc::from(prc.name.clone());
let _ = prc.dependencies.files.iter()
.for_each(|file| {
let mut hm = HashMap::new();
let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())};
hm.insert(proc_name.clone(), (triggers, rx.clone()));
let tempfile = FilesController::new(&file.filename.as_str(), hm)
.with_path(&file.src);
if let Ok(file) = tempfile {
if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) {
current_file.add_event(file);
} else {
self.files.push_back(file);
}
}
});
// servs
let _ = prc.dependencies.services.iter()
.for_each(|serv| {
let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref());
// preparations
let rx = rx.clone();
let serv_cont = ServicesController::new().with_access_name(
&serv.hostname,
&access_url
);
// triggers
let arc: Arc<str> = Arc::from(serv.triggers.on_lost.clone());
let triggers = Triggers::new_service(arc, serv.triggers.wait);
if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) {
proc.add_process(&prc.name, triggers, rx);
} else {
// vecdeque for queue
let mut vec: VecDeque<Arc<str>> = VecDeque::new();
vec.push_back(proc_name.clone());
// connection_queue
let mut connection_queue: BTreeMap<u32, VecDeque<Arc<str>>> = BTreeMap::new();
connection_queue.insert(serv.triggers.wait, vec);
// event_reg
let mut hm = HashMap::new();
hm.insert(proc_name.clone(), (triggers, rx));
let serv_cont = serv_cont.with_params(connection_queue, hm);
self.services.push_back(serv_cont);
}
});
});
self
}
pub fn get_stats(&self) -> String {
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len())
}
}
#[async_trait]
impl ProcessUnit for Supervisor {
async fn process(&mut self) {
info!("Initializing monitoring ...");
loop {
// dbg!(&self);
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
// let res = tokio::join!(prc.process(), file.process(), serv.process());
if let Some(mut val) = self.prcs.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::Process(Some(val))
})
);
}
if let Some(mut val) = self.files.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::File(Some(val))
})
);
}
if let Some(mut val) = self.services.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::Service(Some(val))
})
);
}
for task in tasks {
match task.await {
Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val),
Ok(ControllerResult::File(Some(val))) => self.files.push_back(val),
Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val),
Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."),
_ => { /* DEAD END (CAN NOT BE EXECUTED) */},
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
// spawn tasks
// spawn prc
// spawn files
// spawn services
// ## for ... i.await in loop
pub async fn init_monitoring(
config: Processes
) -> anyhow::Result<()> {
let mut supervisor = Supervisor::new().with_config(&config).await;
info!("Monitoring: {} ", &supervisor.get_stats());
supervisor.process().await;
Ok(())
}
// async fn generate_controllers<'a>(config: Processes) -> (HashSet<ProcessesController<'a>>, HashSet<FilesController<'a>>, HashSet<ServicesController<'a>>) {
// let mut prcs: HashSet<ProcessesController<'a>> = HashSet::new();
// let mut files: HashSet<FilesController<'a>> = HashSet::new();
// let mut services: HashSet<ServicesController<'a>> = HashSet::new();
// for prc in config.processes {
// let (rx, tx) = mpsc::channel::<Events<'a>>(10);
// // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path);
// let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path);
// let a = new_prc.process().await;
// }
// (prcs, files, services)
// }
// spawn prc check with semaphore check
async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) }
// spawn file check with semaphore check
async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) }
// spawn service check with semaphore check
async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) }
}
/// # Fn `run_daemons` /// # Fn `run_daemons`
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel /// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
/// ///
@ -222,37 +40,37 @@ pub mod v2 {
/// ///
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process /// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
/// ///
// pub async fn run_daemons( pub async fn run_daemons(
// proc: Arc<TrackingProcess>, proc: Arc<TrackingProcess>,
// tx: Arc<mpsc::Sender<u8>>, tx: Arc<mpsc::Sender<u8>>,
// rx: &mut mpsc::Receiver<u8>, rx: &mut mpsc::Receiver<u8>,
// ) { ) {
// // creating watchers + ---buffers--- // creating watchers + ---buffers---
// let mut watchers: Vec<Inotify> = vec![]; let mut watchers: Vec<Inotify> = vec![];
// for file in proc.dependencies.files.clone().into_iter() { for file in proc.dependencies.files.clone().into_iter() {
// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
// watchers.push(watcher); watchers.push(watcher);
// } else { } else {
// let _ = tx.send(121).await; let _ = tx.send(121).await;
// } }
// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
// } }
// let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> = let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
// Arc::new(tokio::sync::Mutex::new(watchers)); Arc::new(tokio::sync::Mutex::new(watchers));
// loop { loop {
// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
// tokio::select! { tokio::select! {
// _ = run_hand => continue, _ = run_hand => continue,
// _val = rx.recv() => { _val = rx.recv() => {
// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
// return; return;
// } }
// }, },
// } }
// tokio::task::yield_now().await; tokio::task::yield_now().await;
// } }
// } }
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
match val { match val {
@ -315,6 +133,7 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
}, },
// // 9 - File-dependency change -> staying (after check) // // 9 - File-dependency change -> staying (after check)
9 => { 9 => {
// no need to trash logs
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
}, },
@ -382,36 +201,36 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
/// ///
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` /// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
/// ///
// pub async fn running_handler( pub async fn running_handler(
// prc: Arc<TrackingProcess>, prc: Arc<TrackingProcess>,
// tx: Arc<mpsc::Sender<u8>>, tx: Arc<mpsc::Sender<u8>>,
// watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>, watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
// ) { ) {
// // services and files check (once) // services and files check (once)
// let files_check = file_handler( let files_check = file_handler(
// &prc.name, &prc.name,
// &prc.dependencies.files, &prc.dependencies.files,
// tx.clone(), tx.clone(),
// watchers.clone(), watchers.clone(),
// ); );
// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
// let res = join!(files_check, services_check); let res = join!(files_check, services_check);
// // if inactive -> spawn checks -> active is true // if inactive -> spawn checks -> active is true
// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
// if start_process(&prc.name, &prc.path).await.is_err() { if start_process(&prc.name, &prc.path).await.is_err() {
// tx.send(3).await.unwrap(); tx.send(3).await.unwrap();
// return; return;
// } }
// } }
// // if frozen -> spawn checks -> unfreeze is true // if frozen -> spawn checks -> unfreeze is true
// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
// tx.send(10).await.unwrap(); tx.send(10).await.unwrap();
// return; return;
// } }
// // tokio::time::sleep(Duration::from_millis(100)).await; // tokio::time::sleep(Duration::from_millis(100)).await;
// tokio::task::yield_now().await; tokio::task::yield_now().await;
// } }
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
/// # Fn `get_container_id` /// # Fn `get_container_id`

View File

@ -5,145 +5,7 @@
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender as Sender;
use tokio::time::Duration; use tokio::time::Duration;
use crate::options::structs::Events;
use async_trait::async_trait;
pub mod v2 {
use log::{error, info, warn};
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
use super::*;
use std::{collections::HashMap, path::Path};
type MpscSender = Arc<Sender<Events>>;
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
#[derive(Debug)]
enum FileState {
Ok,
NotFound,
}
#[derive(Debug)]
pub struct FilesController {
name : Arc<str>,
path : String,
code_name : Arc<str>,
state : FileState,
watcher : Option<Inotify>,
triggers : EventHandlers,
}
impl PartialEq for FilesController {
fn eq(&self, other: &Self) -> bool {
self.code_name == other.code_name
}
}
impl FilesController {
pub fn new(name: &str, triggers: EventHandlers) -> FilesController {
let name: Arc<str> = Arc::from(name);
Self {
name : name.clone(),
path : String::new(),
state : FileState::Ok,
watcher : None,
triggers,
code_name : name.clone(),
}
}
pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController> {
self.path = path.as_ref().to_string_lossy().into_owned();
self.watcher = {
match create_watcher(&self.name, &self.path) {
Ok(val) => Some(val),
Err(er) => {
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
return Err(er)
}
}
};
self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name));
Ok(self)
}
pub fn add_event(&mut self, file_controller : FilesController) {
for (k, v) in file_controller.triggers {
self.triggers.entry(k).or_insert(v);
}
}
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
for (prc_name, (triggers, channel)) in &self.triggers {
let msg = match &trigger_type {
None => {
Events::Positive(self.code_name.clone())
},
Some(event) => {
info!("Event on file {} ({}) : {}. Notifying `{}` ...", &self.name, &self.path, event, &prc_name);
event.event_from_file_trigger_controller(self.code_name.clone(), &triggers)
},
};
let _ = channel.send(msg).await;
}
}
}
#[async_trait]
impl ProcessUnit for FilesController {
async fn process(&mut self) {
// polling file check
// 1) existing check
// dbg!(&self);
if let Ok(_) = check_file(&self.name, &self.path).await {
if let FileState::NotFound = self.state {
info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name);
self.state = FileState::Ok;
// reseting negative outcome in prc
self.trigger_on(None).await;
}
match &mut self.watcher {
Some(notify) => {
let mut buffer = [0; 1024];
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
// notif_events.into_iter().for_each(|mask| {dbg!(&mask.mask);});
// todo!();
if let (recreate_watcher, true) = (
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
) {
warn!("File {} ({}) was changed", self.name, &self.path);
if recreate_watcher {
self.watcher = match create_watcher(&self.name, &self.path) {
Ok(notifier) => Some(notifier),
Err(er) => {
error!("Failed to recreate watcher for {} ({}) due to {}",
self.name,
&self.path,
er
);
None
},
}
}
self.trigger_on(Some(FileTriggerType::OnChange)).await;
return;
}
}
},
None => { /* DEAD END */},
}
} else {
if let FileState::Ok = self.state {
warn!("File {} ({}) was not found in determined scope", self.name, &self.path);
self.state = FileState::NotFound;
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
}
return;
}
self.trigger_on(None).await;
// 2) change check
}
}
}
/// # Fn `create_watcher` /// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events /// ## for creating watcher on file's delete | update events
@ -158,7 +20,7 @@
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> { pub async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {
let src = format!("{}{}", path, filename); let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?; let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?; inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
@ -183,12 +45,12 @@
files: &[Files], files: &[Files],
tx: Arc<mpsc::Sender<u8>>, tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>, watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) -> anyhow::Result<()> { ) -> Result<(), CustomError> {
for (i, file) in files.iter().enumerate() { for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename); // let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() { if check_file(&file.filename, &file.src).await.is_err() {
if !is_active(name).await || is_frozen(name).await { if !is_active(name).await || is_frozen(name).await {
return Err(anyhow::Error::msg("Process is frozen or stopped")); return Err(CustomError::Fatal);
} }
match file.triggers.on_delete.as_str() { match file.triggers.on_delete.as_str() {
"stay" => { "stay" => {
@ -199,18 +61,18 @@
if is_active(name).await { if is_active(name).await {
tx.send(1).await.unwrap(); tx.send(1).await.unwrap();
} }
return Err(anyhow::Error::msg("Process was stopped")); return Err(CustomError::Fatal);
} }
"hold" => { "hold" => {
if is_active(name).await { if is_active(name).await {
tx.send(2).await.unwrap(); tx.send(2).await.unwrap();
return Err(anyhow::Error::msg("Process was frozen")); return Err(CustomError::Fatal);
} }
} }
_ => { _ => {
tokio::time::sleep(Duration::from_millis(50)).await; tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap(); tx.send(101).await.unwrap();
return Err(anyhow::Error::msg("Impermissible character or word in file trigger")); return Err(CustomError::Fatal);
} }
} }
} else if is_active(name).await && !is_frozen(name).await { } else if is_active(name).await && !is_frozen(name).await {
@ -237,7 +99,7 @@
let mutex = notify.borrow_mut(); let mutex = notify.borrow_mut();
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); // *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
if let Ok(watcher) = create_watcher(&file.filename, &file.src) { if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
*mutex = watcher; *mutex = watcher;
} }
} }
@ -300,12 +162,12 @@
use super::*; use super::*;
#[tokio::test] #[tokio::test]
async fn try_to_create_watcher() { async fn try_to_create_watcher() {
let res = create_watcher("dep-file", "./tests/examples/"); let res = create_watcher("dep-file", "./tests/examples/").await;
assert!(res.is_ok()); assert!(res.is_ok());
} }
#[tokio::test] #[tokio::test]
async fn try_to_create_invalid_watcher() { async fn try_to_create_invalid_watcher() {
let res = create_watcher("invalid-file", "/path/to/the/no/dir"); let res = create_watcher("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err()); assert!(res.is_err());
} }
#[tokio::test] #[tokio::test]

View File

@ -1,132 +1,8 @@
use crate::options::structs::CustomError;
use log::{error, warn}; use log::{error, warn};
use std::process::{Command, Output}; use std::process::{Command, Output};
use std::sync::Arc; use std::sync::Arc;
use tokio::time::Duration; use tokio::time::Duration;
use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit};
use std::collections::HashSet;
use tokio::sync::mpsc::Receiver as MpscReciever;
use async_trait::async_trait;
pub mod v2 {
use log::info;
use crate::options::structs::DependencyType;
use std::path::Path;
use super::*;
#[derive(Debug)]
pub struct ProcessesController {
name: Arc<str>,
bin: String,
// obj: Arc<TrackingProcess>,
state: ProcessState,
event_reader: MpscReciever<Events>,
negative_events: HashSet<Arc<str>>,
}
impl PartialEq for ProcessesController {
fn eq(&self, other: &Self) -> bool {
self.bin == other.bin
}
}
impl ProcessesController {
pub fn new(name: &str, event_reader: MpscReciever<Events>) -> ProcessesController {
ProcessesController {
name : Arc::from(name),
bin : String::new(),
state : ProcessState::Stopped,
event_reader,
negative_events : HashSet::new(),
}
}
pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController {
self.bin = bin.as_ref().to_string_lossy().into_owned();
self
}
async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) {
match trigger {
"stay" => {
info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name);
},
"stop" => {
if is_active(&self.name).await {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
terminate_process(&self.name).await;
self.state = ProcessState::Stopped;
}
},
"hold" => {
if !is_frozen(&self.name).await {
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
freeze_process(&self.name).await;
self.state = ProcessState::Holding;
}
},
"restart" => {
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
let _ = restart_process(&self.name, &self.bin).await;
},
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
}
tokio::time::sleep(Duration::from_micros(100)).await;
}
}
#[async_trait]
impl ProcessUnit for ProcessesController {
async fn process(&mut self) {
if self.negative_events.len() == 0 {
match self.state {
ProcessState::Holding => {
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
if let Err(er) = unfreeze_process(&self.name).await {
error!("Cannot unfreeze process {} : {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
},
ProcessState::Stopped => {
info!("No negative dependecies events on {} process. Starting ...", self.name);
if let Err(er) = start_process(&self.name, &self.bin).await {
error!("Cannot start process {} : {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
},
_ => {},
}
}
while let Ok(event) = self.event_reader.try_recv() {
match event {
Events::Positive(target) => {
if self.negative_events.contains(&target) {
self.negative_events.remove(&target);
}
},
Events::Negative(event) => {
match event {
NegativeOutcomes::FileWasChanged(target, dep_type, trigger) |
NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) |
NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
if !self.negative_events.contains(&target) {
self.negative_events.insert(target.clone());
self.trigger_on(
&target,
&trigger,
dep_type
).await;
}
},
}
},
}
}
}
}
}
/// # Fn `get_pid` /// # Fn `get_pid`
/// ## for initializing process of unstoppable grubbing metrics. /// ## for initializing process of unstoppable grubbing metrics.
@ -286,11 +162,14 @@ pub async fn freeze_process(name: &str) {
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> { pub async fn unfreeze_process(name: &str) {
let _ = Command::new("pkill") let _ = Command::new("pkill")
.args(["-CONT", name]) .args(["-CONT", name])
.output()?; .output()
Ok(()) .unwrap_or_else(|_| {
error!("Failed to unfreeze process");
std::process::exit(101);
});
} }
/// # Fn `restart_process` /// # Fn `restart_process`
@ -306,7 +185,7 @@ pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> {
/// ///
/// *depends on* : fn `start_process`, fn `terminate_process` /// *depends on* : fn `start_process`, fn `terminate_process`
/// ///
pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> { pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> {
terminate_process(name).await; terminate_process(name).await;
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
start_process(name, path).await start_process(name, path).await
@ -325,7 +204,7 @@ pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> {
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> { pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
// let runsh = format!("{} {}", "exec", path); // let runsh = format!("{} {}", "exec", path);
let mut command = Command::new(path); let mut command = Command::new(path);
// command.arg(path); // command.arg(path);
@ -336,7 +215,8 @@ pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
Err(er) => { Err(er) => {
Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er))) println!("{:?}", er);
Err(CustomError::Fatal)
} }
} }
} }

View File

@ -1,193 +1,10 @@
use crate::options::structs::CustomError; use crate::options::structs::{CustomError, Services};
use super::prcs::{is_active, is_frozen};
use log::{error, warn}; use log::{error, warn};
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc; use std::sync::Arc;
use tokio::time::Duration; use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender as Sender; use tokio::time::{Duration, Instant};
use async_trait::async_trait;
pub mod v2 {
use log::info;
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
use super::*;
use std::collections::{HashMap, BTreeMap, VecDeque};
type MpscSender = Arc<Sender<Events>>;
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
// type wrapper for service wait queue
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
#[derive(Debug)]
pub struct ServicesController {
// i.e. yandex.ru
#[allow(unused)]
name : String,
// i.e. yandex.ru:443
access_url : Arc<str>,
// "OK" or "Unavailable"
state: ServiceState,
// btree map with key as max wait time and it's key to hashmap
config: ConnectionQueue,
// Map of processes with their (trigger and mpsc sender)
event_registrator : EventHandlers,
}
impl PartialEq for ServicesController {
fn eq(&self, other: &Self) -> bool {
self.access_url == other.access_url
}
}
impl ServicesController {
pub fn new() -> ServicesController {
ServicesController {
name : String::new(),
access_url : Arc::from(String::new()),
state : ServiceState::Unavailable,
config: ConnectionQueue::new(),
event_registrator : EventHandlers::new(),
}
}
pub fn with_access_name(
mut self,
hostname: &str,
access_url: &str,
) -> ServicesController {
self.name = hostname.to_string();
self.access_url = Arc::from(access_url);
self
}
pub fn with_params(
mut self,
conn_queue: ConnectionQueue,
event_reg: EventHandlers,
) -> ServicesController {
self.config = conn_queue;
self.event_registrator = event_reg;
self
}
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
}
pub fn add_process(
&mut self,
proc_name: &str,
trigger: Triggers,
sender: MpscSender,
) {
let proc_name: Arc<str> = Arc::from(proc_name);
// queue add
if let Triggers::Service { wait, .. } = trigger {
self.config.entry(wait)
.and_modify(|el| el.push_back(proc_name.clone()))
.or_insert({
let mut temp = VecDeque::new();
temp.push_back(proc_name.clone());
temp
});
}
// event add
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
}
async fn check_state(&self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?;
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
}
Ok(())
}
async fn trigger_on(&mut self) {
match self.state {
ServiceState::Ok => {
let _ = self.event_registrator
.iter()
.map(|(_, (_, el))| async {
let _ = el.send(Events::Positive(self.access_url.clone())).await;
});
},
ServiceState::Unavailable => {
// looped check and notifying
self.looped_check().await;
},
}
}
async fn looped_check(self: &mut Self) {
let longest = self.config.last_entry().unwrap();
let longest = longest.key();
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
let timer = tokio::time::Instant::now();
let mut attempt: u32 = 1;
let access_url = Arc::new(self.access_url.clone());
// let event_registrator = &mut self.event_registrator;
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
// let access_url = access_url.clone();
loop {
interapter.tick().await;
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt);
attempt += 1;
let state_check_result = self.check_state().await;
if state_check_result.is_ok() {
info!("Connection to {} is `OK` now", &access_url);
self.state = ServiceState::Ok;
break;
} else {
let now = timer.elapsed();
let iterator = self.config.iter()
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
.flat_map(|(_, a)| a.iter().cloned())
.collect::<VecDeque<Arc<str>>>();
for name in iterator {
let proc_name = name.to_string();
info!("Trying to notify process `{}` ...", &proc_name);
let sender_opt = self.event_registrator.get(&name)
.map(|(trigger, sender)|
(trigger.to_service_negative_event(name.clone()), sender)
);
if let Some((tr, tx)) = sender_opt {
let _ = tx.send(tr.unwrap()).await;
} else {
error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url)
}
}
}
}
}).await {
info!("Timeout of establishing connection to {}. ", &access_url);
}
}
}
#[async_trait]
impl ProcessUnit for ServicesController {
async fn process(&mut self) {
// check_service(hostname, port)
let current_state = self.check_state().await;
match (&self.state, current_state) {
(ServiceState::Unavailable, Ok(_)) => {
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
self.state = ServiceState::Ok;
self.trigger_on().await;
},
(ServiceState::Ok, Err(_)) => {
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
self.state = ServiceState::Unavailable;
self.trigger_on().await;
},
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url),
_ => { /* DEAD END WITH NO INTEREST */ },
}
}
}
}
/// # Fn `service_handler` /// # Fn `service_handler`
/// ## function to realize mechanism of current process' dep services monitoring /// ## function to realize mechanism of current process' dep services monitoring
@ -202,53 +19,53 @@ pub mod v2 {
/// ///
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` /// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
/// ///
// pub async fn service_handler( pub async fn service_handler(
// name: &str, name: &str,
// services: &Vec<Services>, services: &Vec<Services>,
// tx: Arc<mpsc::Sender<u8>>, tx: Arc<mpsc::Sender<u8>>,
// ) -> Result<(), CustomError> { ) -> Result<(), CustomError> {
// // println!("service daemon on {}", name); // println!("service daemon on {}", name);
// for serv in services { for serv in services {
// if check_service(&serv.hostname, &serv.port).await.is_err() { if check_service(&serv.hostname, &serv.port).await.is_err() {
// if !is_active(name).await || is_frozen(name).await { if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal);
}
error!(
"Service {}:{} is unreachable for process {}",
&serv.hostname, &serv.port, &name
);
match serv.triggers.on_lost.as_str() {
"stay" => {
tx.send(4).await.unwrap();
continue;
}
"stop" => {
if looped_service_connecting(name, serv).await.is_err() {
tx.send(5).await.unwrap();
tokio::task::yield_now().await;
return Err(CustomError::Fatal);
}
}
"hold" => {
// if is_frozen(name).await {
// return Err(CustomError::Fatal); // return Err(CustomError::Fatal);
// } // }
// error!( if looped_service_connecting(name, serv).await.is_err() {
// "Service {}:{} is unreachable for process {}", tx.send(6).await.unwrap();
// &serv.hostname, &serv.port, &name tokio::task::yield_now().await;
// ); return Err(CustomError::Fatal);
// match serv.triggers.on_lost.as_str() { }
// "stay" => { }
// tx.send(4).await.unwrap(); _ => {
// continue; tx.send(101).await.unwrap();
// } return Err(CustomError::Fatal);
// "stop" => { }
// if looped_service_connecting(name, serv).await.is_err() { }
// tx.send(5).await.unwrap(); }
// tokio::task::yield_now().await; }
// return Err(CustomError::Fatal); tokio::time::sleep(Duration::from_millis(100)).await;
// } Ok(())
// } }
// "hold" => {
// // if is_frozen(name).await {
// // return Err(CustomError::Fatal);
// // }
// if looped_service_connecting(name, serv).await.is_err() {
// tx.send(6).await.unwrap();
// tokio::task::yield_now().await;
// return Err(CustomError::Fatal);
// }
// }
// _ => {
// tx.send(101).await.unwrap();
// return Err(CustomError::Fatal);
// }
// }
// }
// }
// tokio::time::sleep(Duration::from_millis(100)).await;
// Ok(())
// }
/// # Fn `looped_service_connecting` /// # Fn `looped_service_connecting`
/// ## for service's state check in loop (with delay and restriction of attempts) /// ## for service's state check in loop (with delay and restriction of attempts)
@ -263,54 +80,54 @@ pub mod v2 {
/// ///
/// *depends on* : fn `check_service` /// *depends on* : fn `check_service`
/// ///
// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
// if serv.triggers.wait == 0 { if serv.triggers.wait == 0 {
// loop { loop {
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
// warn!( warn!(
// "Attempting to connect from {} process to {}:{}", "Attempting to connect from {} process to {}:{}",
// &name, &serv.hostname, &serv.port &name, &serv.hostname, &serv.port
// ); );
// match check_service(&serv.hostname, &serv.port).await { match check_service(&serv.hostname, &serv.port).await {
// Ok(_) => { Ok(_) => {
// log::info!( log::info!(
// "Successfully connected to {} from {} process!", "Successfully connected to {} from {} process!",
// &serv.hostname, &serv.hostname,
// &name &name
// ); );
// break; break;
// } }
// Err(_) => { Err(_) => {
// tokio::task::yield_now().await; tokio::task::yield_now().await;
// } }
// } }
// } }
// Ok(()) Ok(())
// } else { } else {
// let start = Instant::now(); let start = Instant::now();
// while start.elapsed().as_secs() < serv.triggers.wait.into() { while start.elapsed().as_secs() < serv.triggers.wait.into() {
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
// warn!( warn!(
// "Attempting to connect from {} process to {}:{}", "Attempting to connect from {} process to {}:{}",
// &name, &serv.hostname, &serv.port &name, &serv.hostname, &serv.port
// ); );
// match check_service(&serv.hostname, &serv.port).await { match check_service(&serv.hostname, &serv.port).await {
// Ok(_) => { Ok(_) => {
// log::info!( log::info!(
// "Successfully connected to {} from {} process!", "Successfully connected to {} from {} process!",
// &serv.hostname, &serv.hostname,
// &name &name
// ); );
// return Ok(()); return Ok(());
// } }
// Err(_) => { Err(_) => {
// tokio::task::yield_now().await; tokio::task::yield_now().await;
// } }
// } }
// } }
// Err(CustomError::Fatal) Err(CustomError::Fatal)
// } }
// } }
/// # Fn `check_service` /// # Fn `check_service`
/// ## for check current service's availiability /// ## for check current service's availiability