Compare commits
44 Commits
master
...
feature/co
| Author | SHA1 | Date |
|---|---|---|
|
|
aae0391a0c | |
|
|
3dd238cf97 | |
|
|
403285a937 | |
|
|
a16eb78b79 | |
|
|
281841b68a | |
|
|
34979a035d | |
|
|
09c1baed8e | |
|
|
3c22a67052 | |
|
|
052448a7b9 | |
|
|
6d56d1e39c | |
|
|
541b0f52dd | |
|
|
2495fb84cf | |
|
|
c3fd0dd09f | |
|
|
502ea114a6 | |
|
|
28092d945a | |
|
|
0d68efd461 | |
|
|
4fc90300fc | |
|
|
e3f07f42a6 | |
|
|
cd7669d942 | |
|
|
721fa6c758 | |
|
|
c50c444f21 | |
|
|
71acb4a32e | |
|
|
f504632c4d | |
|
|
2b82fb7aac | |
|
|
584404c050 | |
|
|
886ae6308b | |
|
|
8f1214bd9a | |
|
|
011c479550 | |
|
|
27e79ce731 | |
|
|
026a502044 | |
|
|
163887d42c | |
|
|
35a21da431 | |
|
|
064611823a | |
|
|
e4c3e5f46f | |
|
|
6adab1b903 | |
|
|
98da769dd3 | |
|
|
a8a7fd8a72 | |
|
|
3d88967281 | |
|
|
8c1998c93f | |
|
|
f560dfebc5 | |
|
|
0f160f4dcd | |
|
|
7a5704dd93 | |
|
|
df03bd5346 | |
|
|
e9b6abefdf |
|
|
@ -4,3 +4,4 @@
|
|||
Cargo.lock
|
||||
hagent_test.sock
|
||||
release
|
||||
*.sock
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "noxis-cli"
|
||||
version = "0.2.4"
|
||||
version = "0.2.7"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,11 +2,17 @@ use clap::{Parser, Subcommand};
|
|||
|
||||
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Cli {
|
||||
#[arg(
|
||||
short,
|
||||
default_value="noxis-rs.sock",
|
||||
help="explicit specify of NOXIS Socket file"
|
||||
)]
|
||||
pub socket : String,
|
||||
#[command(
|
||||
subcommand,
|
||||
help = "to manage Noxis work",
|
||||
)]
|
||||
command : Commands,
|
||||
pub command : Commands,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
|
||||
|
|
@ -50,13 +56,13 @@ pub struct StartAction {
|
|||
num_args = 1..,
|
||||
value_delimiter = ' '
|
||||
)]
|
||||
flags : Vec<String>,
|
||||
pub flags : Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
||||
pub struct ConfigCommand {
|
||||
#[command(subcommand)]
|
||||
action : ConfigAction,
|
||||
pub action : ConfigAction,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
|
||||
|
|
@ -83,12 +89,12 @@ pub struct LocalConfig {
|
|||
action,
|
||||
help = "to read following input as JSON",
|
||||
)]
|
||||
is_json : bool,
|
||||
pub is_json : bool,
|
||||
// value
|
||||
#[arg(
|
||||
help = "path to config file or config String (with --json flag)",
|
||||
)]
|
||||
config : String,
|
||||
pub config : String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
||||
|
|
@ -96,16 +102,16 @@ pub struct ProcessCommand {
|
|||
#[arg(
|
||||
help = "name of needed process",
|
||||
)]
|
||||
process : String,
|
||||
pub process : String,
|
||||
#[command(
|
||||
subcommand,
|
||||
help = "To get current process's status",
|
||||
)]
|
||||
action : ProcessAction,
|
||||
pub action : ProcessAction,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
|
||||
enum ProcessAction {
|
||||
pub enum ProcessAction {
|
||||
#[command(
|
||||
about = "To get info about current process status",
|
||||
)]
|
||||
|
|
|
|||
|
|
@ -1,14 +1,15 @@
|
|||
use thiserror::Error;
|
||||
use super::cli_net::NOXIS_RS_CREDS;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum NoxisCliError {
|
||||
#[error("Can't send any data to {:?}. Noxis-rs daemon is disabled or can't be accessed", NOXIS_RS_CREDS)]
|
||||
NoxisDaemonMissing,
|
||||
#[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's web-functionality")]
|
||||
#[error("Can't find socket `{0}`. Error : {1}")]
|
||||
NoxisDaemonMissing(String, String),
|
||||
#[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")]
|
||||
PortIsNotWritable,
|
||||
#[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")]
|
||||
CliPromptCanNotBeSent,
|
||||
#[error("Can't parse CLI struct and send as byte stream")]
|
||||
ToStringCliParsingParsing,
|
||||
#[error("Can't read Noxis response due to {0}")]
|
||||
CliResponseReadError(String)
|
||||
}
|
||||
|
|
@ -1,32 +1,30 @@
|
|||
use tokio::net::TcpStream;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::UnixStream;
|
||||
use tokio::io::{AsyncWriteExt, AsyncReadExt};
|
||||
use tokio::time::{Duration, sleep};
|
||||
use anyhow::Result;
|
||||
use super::Cli;
|
||||
use super::cli_error::NoxisCliError;
|
||||
|
||||
pub const NOXIS_RS_CREDS: &str = "127.0.0.1:7753";
|
||||
|
||||
|
||||
pub async fn create_tcp_stream() -> Result<TcpStream> {
|
||||
Ok(TcpStream::connect(NOXIS_RS_CREDS).await.map_err(|_| NoxisCliError::NoxisDaemonMissing)?)
|
||||
async fn create_us_stream(cli: &Cli) -> Result<UnixStream> {
|
||||
Ok(UnixStream::connect(&cli.socket).await.map_err(|er| NoxisCliError::NoxisDaemonMissing((&cli.socket).to_string(), er.to_string()))?)
|
||||
}
|
||||
|
||||
pub async fn try_send(stream: Result<TcpStream>, params: Cli) -> Result<()> {
|
||||
use serde_json::to_string;
|
||||
let mut stream = stream.map_err(|_| NoxisCliError::NoxisDaemonMissing)?;
|
||||
loop {
|
||||
if stream.writable().await.is_err() {
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
continue;
|
||||
}
|
||||
// let msg: Cli = from_str(&format!("{:?}", params))?;
|
||||
let msg= to_string(¶ms).map_err(|_| NoxisCliError::ToStringCliParsingParsing)?;
|
||||
// let msg = r"HTTP/1.1 POST\r\nContent-Length: 14\r\nContent-Type: text/plain\r\n\r\nHello, World!@";
|
||||
pub async fn try_send(cli: Cli) -> Result<()> {
|
||||
// let stream = create_us_stream(&cli).await;
|
||||
let mut stream = create_us_stream(&cli).await?;
|
||||
|
||||
stream.write_all(msg.as_bytes()).await.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
|
||||
// ...
|
||||
break;
|
||||
}
|
||||
let msg = serde_json::to_vec(&cli)
|
||||
.map_err(|_| NoxisCliError::ToStringCliParsingParsing)?;
|
||||
|
||||
stream.write_all(&msg)
|
||||
.await
|
||||
.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
|
||||
|
||||
let mut response = [0; 1024];
|
||||
stream.read(&mut response)
|
||||
.await
|
||||
.map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?;
|
||||
|
||||
println!("Received response: {}", String::from_utf8_lossy(&response));
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -4,12 +4,12 @@ mod cli_error;
|
|||
|
||||
use clap::Parser;
|
||||
use cli::Cli;
|
||||
use cli_net::{create_tcp_stream, try_send};
|
||||
use cli_net::try_send;
|
||||
use anyhow::Result;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()>{
|
||||
let cli = Cli::parse();
|
||||
try_send(create_tcp_stream().await, cli).await?;
|
||||
try_send(cli).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "noxis-rs"
|
||||
version = "0.11.10"
|
||||
version = "0.11.26"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
|
@ -8,13 +8,15 @@ anyhow = "1.0.93"
|
|||
chrono = "0.4.38"
|
||||
clap = { version = "4.5.21", features = ["derive"] }
|
||||
env_logger = "0.11.3"
|
||||
inotify = "0.10.2"
|
||||
inotify = "0.11.0"
|
||||
log = "0.4.22"
|
||||
pcap = "2.2.0"
|
||||
redis = "0.25.4"
|
||||
redis = "0.29.2"
|
||||
serde = { version = "1.0.203", features = ["derive"] }
|
||||
serde_json = "1.0.118"
|
||||
sysinfo = "0.32.0"
|
||||
tokio = { version = "1.38.0", features = ["full", "time"] }
|
||||
noxis-cli = { path = "../noxis-cli" }
|
||||
dotenv = "0.15.0"
|
||||
futures = "0.3.31"
|
||||
async-trait = "0.1.88"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"dateOfCreation": "1721381809104",
|
||||
"configServer": "localhost",
|
||||
"dateOfCreation": "1721381809112",
|
||||
"configServer": "192.168.2.37",
|
||||
"processes": [
|
||||
{
|
||||
"name": "temp-process",
|
||||
|
|
@ -12,7 +12,7 @@
|
|||
"src": "./tests/examples/",
|
||||
"triggers": {
|
||||
"onDelete": "stop",
|
||||
"onChange": "stay"
|
||||
"onChange": "restart"
|
||||
}
|
||||
}
|
||||
],
|
||||
|
|
@ -22,8 +22,7 @@
|
|||
"port": 443,
|
||||
"triggers": {
|
||||
"wait": 10,
|
||||
"delay": 2,
|
||||
"onLost": "hold"
|
||||
"onLost": "restart"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
mod options;
|
||||
mod utils;
|
||||
|
||||
use anyhow::Error;
|
||||
use clap::Parser;
|
||||
use log::{error, info};
|
||||
use options::config::*;
|
||||
|
|
@ -14,84 +13,140 @@ use std::time::Duration;
|
|||
use tokio::sync::mpsc;
|
||||
use utils::*;
|
||||
use options::preboot::PrebootParams;
|
||||
use tokio::sync::{broadcast, oneshot};
|
||||
use options::config::v2::init_config_mechanism;
|
||||
use utils::v2::init_monitoring;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn main() -> anyhow::Result<()>{
|
||||
let preboot = Arc::new(PrebootParams::parse().validate()?);
|
||||
|
||||
let _ = setup_logger();
|
||||
|
||||
info!("Runner is configurating...");
|
||||
|
||||
// setting up redis connection \
|
||||
// then conf checks to choose the most actual \
|
||||
let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
|
||||
error!("No actual configuration for runner. Stopping...");
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
info!(
|
||||
"Current runner configuration: {}",
|
||||
&processes.date_of_creation
|
||||
);
|
||||
info!("Runner is ready. Initializing...");
|
||||
|
||||
if processes.processes.is_empty() {
|
||||
error!("Processes list is null, runner-rs initialization is stopped");
|
||||
return Err(Error::msg("Empty processes segment in config"));
|
||||
}
|
||||
info!("Noxis is configurating...");
|
||||
//
|
||||
let (tx_brd, mut rx_brd) = broadcast::channel::<Processes>(1);
|
||||
// cli <-> config
|
||||
let (tx_oneshot, rx_oneshot) = oneshot::channel::<Processes>();
|
||||
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
||||
// is in need to send to the signals handler thread
|
||||
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
|
||||
|
||||
for proc in processes.processes.iter() {
|
||||
info!(
|
||||
"Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
|
||||
proc.name,
|
||||
proc.path,
|
||||
proc.dependencies.files.len(),
|
||||
proc.dependencies.services.len()
|
||||
);
|
||||
// initilaizing task for config manipulations
|
||||
let config_module = tokio::spawn(async move {
|
||||
let _ = init_config_mechanism(
|
||||
rx_oneshot,
|
||||
tx_brd,
|
||||
preboot.clone()
|
||||
).await;
|
||||
});
|
||||
handler.push(config_module);
|
||||
|
||||
// creating msg channel
|
||||
// can or should be executed in new thread
|
||||
let (tx, mut rx) = mpsc::channel::<u8>(1);
|
||||
let proc = Arc::new(proc.clone());
|
||||
let tx = Arc::new(tx.clone());
|
||||
|
||||
senders.push(Arc::clone(&tx.clone()));
|
||||
|
||||
let event = tokio::spawn(async move {
|
||||
run_daemons(proc.clone(), tx.clone(), &mut rx).await;
|
||||
});
|
||||
handler.push(event);
|
||||
}
|
||||
|
||||
// destructor addition
|
||||
handler.push(tokio::spawn(async move {
|
||||
if set_valid_destructor(Arc::new(senders)).await.is_err() {
|
||||
error!("Linux signals handler creation failed. Terminating main thread...");
|
||||
return;
|
||||
// initilaizing task for cli manipulation
|
||||
let cli_module = tokio::spawn(async move {
|
||||
if let Err(er) = init_cli_pipeline().await {
|
||||
error!("CLI pipeline failed due to {}", er)
|
||||
}
|
||||
});
|
||||
handler.push(cli_module);
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
info!("End of job. Terminating main thread...");
|
||||
// initilaizing task for deinitializing `Noxis`
|
||||
let ctrlc = tokio::spawn(async move {
|
||||
if let Err(er) = set_valid_destructor(vec![].into()).await {
|
||||
error!("Destructor mod failed due to {}", er);
|
||||
}
|
||||
std::process::exit(0);
|
||||
}));
|
||||
});
|
||||
handler.push(ctrlc);
|
||||
|
||||
// remote config update subscription
|
||||
handler.push(tokio::spawn(async move {
|
||||
let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
|
||||
}));
|
||||
|
||||
// cli pipeline
|
||||
handler.push(tokio::spawn(async move {
|
||||
let _ = init_cli_pipeline().await;
|
||||
}));
|
||||
let monitoring = tokio::spawn(async move {
|
||||
let config = {
|
||||
let mut tick = tokio::time::interval(Duration::from_millis(500));
|
||||
loop {
|
||||
tick.tick().await;
|
||||
break match rx_brd.try_recv() {
|
||||
Ok(conf) => conf,
|
||||
Err(_) => continue,
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Err(er) = init_monitoring(config).await {
|
||||
error!("Monitoring mod failed due to {}", er);
|
||||
}
|
||||
});
|
||||
handler.push(monitoring);
|
||||
|
||||
for i in handler {
|
||||
let _ = i.await;
|
||||
}
|
||||
|
||||
// setting up redis connection \
|
||||
// then conf checks to choose the most actual \
|
||||
// let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
|
||||
// error!("No actual configuration for runner. Stopping...");
|
||||
// std::process::exit(1);
|
||||
// });
|
||||
//
|
||||
// info!(
|
||||
// "Current runner configuration: {}",
|
||||
// &processes.date_of_creation
|
||||
// );
|
||||
// info!("Runner is ready. Initializing...");
|
||||
//
|
||||
// if processes.processes.is_empty() {
|
||||
// error!("Processes list is null, runner-rs initialization is stopped");
|
||||
// return Err(Error::msg("Empty processes segment in config"));
|
||||
// }
|
||||
// let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
||||
// // is in need to send to the signals handler thread
|
||||
// let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
|
||||
//
|
||||
// for proc in processes.processes.iter() {
|
||||
// info!(
|
||||
// "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
|
||||
// proc.name,
|
||||
// proc.path,
|
||||
// proc.dependencies.files.len(),
|
||||
// proc.dependencies.services.len()
|
||||
// );
|
||||
//
|
||||
// // creating msg channel
|
||||
// // can or should be executed in new thread
|
||||
// let (tx, mut rx) = mpsc::channel::<u8>(1);
|
||||
// let proc = Arc::new(proc.clone());
|
||||
// let tx = Arc::new(tx.clone());
|
||||
//
|
||||
// senders.push(Arc::clone(&tx.clone()));
|
||||
//
|
||||
// let event = tokio::spawn(async move {
|
||||
// run_daemons(proc.clone(), tx.clone(), &mut rx).await;
|
||||
// });
|
||||
// handler.push(event);
|
||||
// }
|
||||
//
|
||||
// // destructor addition
|
||||
// handler.push(tokio::spawn(async move {
|
||||
// if set_valid_destructor(Arc::new(senders)).await.is_err() {
|
||||
// error!("Linux signals handler creation failed. Terminating main thread...");
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
// info!("End of job. Terminating main thread...");
|
||||
// std::process::exit(0);
|
||||
// }));
|
||||
//
|
||||
// // remote config update subscription
|
||||
// handler.push(tokio::spawn(async move {
|
||||
// let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
|
||||
// }));
|
||||
//
|
||||
// // cli pipeline
|
||||
// handler.push(tokio::spawn(async move {
|
||||
// let _ = init_cli_pipeline().await;
|
||||
// }));
|
||||
//
|
||||
// for i in handler {
|
||||
// let _ = i.await;
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,12 +1,9 @@
|
|||
use log::{error, info, warn};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use anyhow::{Result as DynResult, Error};
|
||||
use log::{error, info};
|
||||
use tokio::net::{ UnixStream, UnixListener };
|
||||
use tokio::time::{sleep, Duration};
|
||||
use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}};
|
||||
// use std::io::BufReader;
|
||||
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
|
||||
use std::fs;
|
||||
use tokio::io::{ AsyncWriteExt, AsyncReadExt};
|
||||
use noxis_cli::Cli;
|
||||
use serde_json::from_str;
|
||||
|
||||
/// # Fn `init_cli_pipeline`
|
||||
/// ## for catching all input requests from CLI
|
||||
|
|
@ -21,49 +18,32 @@ use serde_json::from_str;
|
|||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
pub async fn init_cli_pipeline() -> DynResult<()> {
|
||||
match init_listener().await {
|
||||
Some(list) => {
|
||||
pub async fn init_cli_pipeline() -> anyhow::Result<()> {
|
||||
let socket_path = "noxis.sock";
|
||||
let _ = fs::remove_file(socket_path);
|
||||
|
||||
match UnixListener::bind(socket_path) {
|
||||
Ok(list) => {
|
||||
// TODO: remove `unwrap`s
|
||||
info!("Listening on {}", socket_path);
|
||||
loop {
|
||||
if let Ok((socket, addr)) = list.accept().await {
|
||||
// isolation
|
||||
if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() {
|
||||
warn!("Declined attempt to connect TCP-socket from {}", addr);
|
||||
continue;
|
||||
}
|
||||
process_connection(socket).await;
|
||||
match list.accept().await {
|
||||
Ok((socket, _)) => {
|
||||
// tokio::spawn();
|
||||
process_connection(socket).await;
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Cannot poll connection to CLI due to {}", er);
|
||||
sleep(Duration::from_millis(300)).await;
|
||||
},
|
||||
}
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
// Ok(())
|
||||
},
|
||||
None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use"))
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `init_listener`
|
||||
/// ## for creating TCP-listener for communicating with CLI
|
||||
///
|
||||
/// *input* : -
|
||||
///
|
||||
/// *output* : `Some<TcpListener>` if port 7753 was opened | None if not
|
||||
///
|
||||
/// *initiator* : fn `init_cli_pipeline`
|
||||
///
|
||||
/// *managing* : `TcpListener` object to handle requests
|
||||
///
|
||||
/// *depends on* : `tokio::net::TcpListener`
|
||||
///
|
||||
async fn init_listener() -> Option<TcpListener> {
|
||||
match TcpListener::bind("127.0.0.1:7753").await {
|
||||
Ok(listener) => {
|
||||
info!("Runner is listening localhost:7753");
|
||||
Some(listener)
|
||||
Err(er) => {
|
||||
error!("Failed to open UnixListener for CLI");
|
||||
Err(er.into())
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Cannot create TCP listener for CLI");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -80,27 +60,29 @@ async fn init_listener() -> Option<TcpListener> {
|
|||
///
|
||||
/// *depends on* : `tokio::net::TcpStream`
|
||||
///
|
||||
async fn process_connection(mut stream: TcpStream) {
|
||||
let buf_reader = BufReader::new(stream.borrow_mut());
|
||||
let mut rqst = buf_reader.lines();
|
||||
|
||||
|
||||
while let Ok(Some(line)) = rqst.next_line().await {
|
||||
if line.is_empty() {
|
||||
break
|
||||
}
|
||||
match from_str::<Cli>(&line) {
|
||||
Ok(req) => {
|
||||
// TODO: func wrapper
|
||||
dbg!(req);
|
||||
},
|
||||
Err(_) => {
|
||||
break
|
||||
},
|
||||
}
|
||||
println!("{}", line);
|
||||
async fn process_connection(mut stream: UnixStream) {
|
||||
let mut buf = vec![0; 1024];
|
||||
match stream.read(&mut buf).await {
|
||||
Ok(0) => {
|
||||
info!("Client disconnected ");
|
||||
},
|
||||
Ok(n) => {
|
||||
buf.truncate(n);
|
||||
info!("CLI have sent {} bytes", n);
|
||||
match serde_json::from_slice::<Cli>(&buf) {
|
||||
Ok(cli) => {
|
||||
info!("Received CLI request: {:?}", cli);
|
||||
let response = "OK";
|
||||
if let Err(e) = stream.write_all(response.as_bytes()).await {
|
||||
error!("Failed to send response: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to parse CLI request: {}", e);
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => error!("Failed to read from socket: {}", e),
|
||||
}
|
||||
|
||||
let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!";
|
||||
stream.write_all(response.as_bytes()).await.unwrap();
|
||||
let _ = stream.shutdown().await;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,9 +9,393 @@ use std::sync::Arc;
|
|||
use std::{env, fs};
|
||||
use super::preboot::PrebootParams;
|
||||
use tokio::time::{Duration, sleep};
|
||||
// use redis::PubSub;
|
||||
use tokio::sync::{
|
||||
oneshot,
|
||||
oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender },
|
||||
broadcast::Sender as BroadcastSender, broadcast::Receiver as BroadcastReceiver };
|
||||
use crate::utils::files::create_watcher;
|
||||
use std::fs::File;
|
||||
use inotify::EventMask;
|
||||
|
||||
// const CONFIG_PATH: &str = "settings.json";
|
||||
|
||||
pub mod v2 {
|
||||
use std::path::PathBuf;
|
||||
use crate::utils::get_container_id;
|
||||
|
||||
use super::*;
|
||||
|
||||
pub async fn init_config_mechanism(
|
||||
// to handle cli config changes
|
||||
cli_oneshot: OneShotReciever<Processes>,
|
||||
// to share local config with PRCS, CLI_PIPELINE and CONFIG modules
|
||||
brd_tx : BroadcastSender<Processes>,
|
||||
// preboot params (args)
|
||||
params : Arc<PrebootParams>
|
||||
/*...*/
|
||||
) {
|
||||
// channel for pubsub to handle local config pulling
|
||||
let local_config_brd_reciever = brd_tx.subscribe();
|
||||
// channel between pub-sub mech and local config mech
|
||||
let (tx_pb_lc, rx_pb_lc) = oneshot::channel::<bool>();
|
||||
// channel between cli mech and local config mech
|
||||
let (tx_cli_lc, rx_cli_lc) = oneshot::channel::<bool>();
|
||||
|
||||
// dbg!("before lc");
|
||||
let params_clone = params.clone();
|
||||
let for_lc_path = params.clone();
|
||||
let lc_path = for_lc_path
|
||||
.config
|
||||
.to_str()
|
||||
.unwrap_or("settings.json");
|
||||
|
||||
// future to init work with local config
|
||||
let lc_future = tokio::spawn(
|
||||
// let params = params.clone();
|
||||
local_config_reciever(
|
||||
params_clone,
|
||||
rx_pb_lc,
|
||||
rx_cli_lc,
|
||||
Arc::new(brd_tx)
|
||||
)
|
||||
);
|
||||
// dbg!("before pb");
|
||||
// future to init work with pub sub mechanism
|
||||
let pubsub_future = tokio::spawn(
|
||||
pubsub_config_reciever(
|
||||
tx_pb_lc,
|
||||
params.clone(),
|
||||
local_config_brd_reciever
|
||||
)
|
||||
);
|
||||
|
||||
// dbg!("before cli");
|
||||
// future to catch new configs from cli pipeline
|
||||
let cli_future = tokio::spawn(
|
||||
from_cli_config_reciever(
|
||||
cli_oneshot,
|
||||
tx_cli_lc
|
||||
)
|
||||
|
||||
);
|
||||
// let _ = lc_future.await;
|
||||
// dbg!("before select");
|
||||
tokio::select! {
|
||||
lc_result = lc_future => {
|
||||
// dbg!("end of lc");
|
||||
match lc_result {
|
||||
Ok(res) => {
|
||||
if res.is_ok() {
|
||||
info!("Local config warding mechanism stopped, waiting for others ...");
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
let _ = restart_main_thread();
|
||||
}
|
||||
else {
|
||||
error!("Local config warding mechanism crushed, restarting ...");
|
||||
let _ = restart_main_thread();
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Local config warding mechanism crushed, restarting ...");
|
||||
let _ = restart_main_thread();
|
||||
},
|
||||
}
|
||||
},
|
||||
pb_result = pubsub_future => {
|
||||
match pb_result {
|
||||
Ok(res) => {
|
||||
if res.is_ok() {
|
||||
info!("New config was saved locally, restarting ...");
|
||||
}
|
||||
else {
|
||||
error!("Pubsub mechanism crushed, restarting ...");
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Pubsub mechanism crushed, restarting ...");
|
||||
},
|
||||
}
|
||||
let _ = restart_main_thread();
|
||||
},
|
||||
cli_config_option = cli_future => {
|
||||
match cli_config_option {
|
||||
Err(_) => error!("CLI pulling new config mechanism crushed, restarting ..."),
|
||||
Ok(option_config) => {
|
||||
match option_config {
|
||||
None => error!("CLI pulling new config mechanism crushed, restarting ..."),
|
||||
Some(config) => {
|
||||
info!("New config was pulled from CLI, saving and restarting ...");
|
||||
let _ = save_new_config(&config, lc_path);
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
let _ = restart_main_thread();
|
||||
},
|
||||
}
|
||||
// dbg!("after select");
|
||||
// TODO! futures + select! [OK]
|
||||
// TODO! tests config
|
||||
}
|
||||
pub async fn get_redis_connection(params: &str) -> Option<Connection> {
|
||||
for i in 1..=3 {
|
||||
let redis_url = format!("redis://{}/", params);
|
||||
info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i);
|
||||
if let Ok(client) = Client::open(redis_url) {
|
||||
if let Ok(conn) = client.get_connection() {
|
||||
info!("Successfully opened Redis connection");
|
||||
return Some(conn);
|
||||
}
|
||||
}
|
||||
error!("Error with subscribing Redis stream on update. Retrying in 5 secs...");
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
// loop checking redis pubsub
|
||||
async fn pubsub_config_reciever(
|
||||
// to stop checking local config
|
||||
local_conf_tx : OneShotSender<bool>,
|
||||
params : Arc<PrebootParams>,
|
||||
tx_brd_local : BroadcastReceiver<Processes>,
|
||||
) -> anyhow::Result<()>{
|
||||
/*...*/
|
||||
// dbg!("start of pb");
|
||||
let mut tx_brd_local = tx_brd_local;
|
||||
let local_config = if !tx_brd_local.is_empty() {
|
||||
tx_brd_local.recv().await?
|
||||
} else {
|
||||
// Processes::default()
|
||||
let mut tick = tokio::time::interval(Duration::from_millis(500));
|
||||
loop {
|
||||
tick.tick().await;
|
||||
break match tx_brd_local.recv().await {
|
||||
Ok(conf) => conf,
|
||||
Err(_) => continue,
|
||||
};
|
||||
}
|
||||
};
|
||||
match get_redis_connection(&local_config.config_server).await {
|
||||
Some(mut conn) => {
|
||||
let mut pub_sub = conn.as_pubsub();
|
||||
let channel_name = get_container_id().unwrap_or(String::from("default"));
|
||||
let channel_name = channel_name.trim();
|
||||
match pub_sub.subscribe(channel_name) {
|
||||
Err(er) => {
|
||||
error!("Cannot subscribe pubsub channel due to {}", &er);
|
||||
return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
|
||||
},
|
||||
Ok(_) => {
|
||||
info!("Successfully subscribed to {} pubsub channel", channel_name);
|
||||
let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3)));
|
||||
loop {
|
||||
if let Ok(msg) = pub_sub.get_message() {
|
||||
// dbg!("ok on get message");
|
||||
let payload : Result<String, _> = msg.get_payload();
|
||||
match payload {
|
||||
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
|
||||
Ok(payload) => {
|
||||
if let Some(remote) = parse_extern_config(&payload) {
|
||||
match config_comparing(&local_config, &remote) {
|
||||
ConfigActuality::Local => {
|
||||
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
|
||||
},
|
||||
ConfigActuality::Remote => {
|
||||
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
|
||||
// to stop watching local config file mechanism
|
||||
let _ = local_conf_tx.send(true);
|
||||
let config_path = params.config.to_str().unwrap_or("settings.json");
|
||||
|
||||
if save_new_config(&remote, &config_path).is_err() {
|
||||
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
|
||||
return Err(anyhow::Error::msg(
|
||||
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
|
||||
))
|
||||
}
|
||||
return Ok(());
|
||||
},
|
||||
}
|
||||
}
|
||||
else {
|
||||
warn!("Invalid config was pulled from Redis channel")
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// delay
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
None => {
|
||||
sleep(Duration::from_secs(20)).await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
async fn local_config_reciever(
|
||||
params : Arc<PrebootParams>,
|
||||
pubsub_oneshot : OneShotReciever<bool>,
|
||||
cli_oneshot : OneShotReciever<bool>,
|
||||
brd_tx : Arc<BroadcastSender<Processes>>,
|
||||
/*...*/
|
||||
) -> anyhow::Result<()> {
|
||||
/*...*/
|
||||
// shadowing as mut
|
||||
let mut pubsub_oneshot = pubsub_oneshot;
|
||||
let mut cli_oneshot = cli_oneshot;
|
||||
// fill with default empty config, mut to change later
|
||||
let mut _current_config = Processes::default();
|
||||
// PathBuf to &str to work with local config path as slice
|
||||
let local_config_path = params
|
||||
.config
|
||||
.to_str()
|
||||
.unwrap_or("settings.json");
|
||||
|
||||
match load_processes(local_config_path) {
|
||||
// if local exists
|
||||
Some(conf) => {
|
||||
info!("Local config `{}` was found.", &conf.date_of_creation);
|
||||
_current_config = conf;
|
||||
if let Err(er) = brd_tx.send(_current_config.clone()) {
|
||||
error!("Cannot share local config with broadcast due to {}", er);
|
||||
}
|
||||
},
|
||||
// if local is not exist
|
||||
None => {
|
||||
warn!("Local config wasn't found. Waiting for new ...");
|
||||
return Err(anyhow::Error::msg("No local config"));
|
||||
// ...
|
||||
},
|
||||
}
|
||||
|
||||
// 100% local exists here
|
||||
// create watcher on local config file
|
||||
match create_watcher("", local_config_path) {
|
||||
Ok(mut watcher) => {
|
||||
loop {
|
||||
let mut need_to_export_config = false;
|
||||
// let mut need_to_recreate_watcher = false;
|
||||
// return situations here
|
||||
// 1) oneshot signal
|
||||
// 2) if config was deleted -> recreate and fill with current config that is held here
|
||||
// 3) if config was changed -> fill with current config that is held here
|
||||
|
||||
// catching signal from pubsub
|
||||
// it's because pubsub mech pulled new valid and actual config and now it's time to ...
|
||||
// ... overwrite local config file and restart main thread
|
||||
if let Ok(_) = pubsub_oneshot.try_recv() {
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// catching signal from cli
|
||||
// it's because cli mech pulled new valid and actual config and now it's time to ...
|
||||
// ... overwrite local config file and restart main thread (like in previous mechanism)
|
||||
if let Ok(_) = cli_oneshot.try_recv() {
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ...
|
||||
// ! ... FROM WATCHER"S BUFFER
|
||||
|
||||
// existing check
|
||||
if !params.config.exists() {
|
||||
warn!("Local config file was deleted or moved. Recreating new one with saved data ...");
|
||||
need_to_export_config = true;
|
||||
// need_to_recreate_watcher = true;
|
||||
} else {
|
||||
// changes check
|
||||
let mut buffer = [0; 128];
|
||||
let events = watcher.read_events(&mut buffer);
|
||||
if events.is_ok() {
|
||||
let events: Vec<EventMask> = events
|
||||
.unwrap()
|
||||
.map(|mask| mask.mask)
|
||||
.filter(|mask| {
|
||||
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
||||
})
|
||||
.collect();
|
||||
if !events.is_empty() {
|
||||
warn!("Local config file was overwritten. Discarding changes ...");
|
||||
need_to_export_config = true;
|
||||
// events
|
||||
// .iter()
|
||||
// .any(|event| *event == EventMask::DELETE_SELF)
|
||||
// .then(|| need_to_recreate_watcher = true);
|
||||
}
|
||||
}
|
||||
}
|
||||
// exporting data
|
||||
if need_to_export_config {
|
||||
if let Err(er) = export_saved_config_data_locally(¶ms.config, &_current_config).await {
|
||||
error!("Cannot save actual imported config due to {}", er);
|
||||
} else {
|
||||
// recreation watcher (draining activity buffer mechanism)
|
||||
// if local config file was deleted and recreated
|
||||
// if local config file was modified locally
|
||||
match create_watcher("", local_config_path) {
|
||||
Ok(new) => watcher = new,
|
||||
Err(er) => error!("Cannot create new watcher due to {}", er),
|
||||
}
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_millis(300)).await;
|
||||
// tokio::task::yield_now().await;
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path);
|
||||
return Err(anyhow::Error::msg("Cannot create watcher on local config file"));
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// [:IN-TEST]
|
||||
async fn from_cli_config_reciever(
|
||||
cli_oneshot: OneShotReciever<Processes>,
|
||||
to_local_tx: OneShotSender<bool>
|
||||
) -> Option<Processes> {
|
||||
/* match awaits til channel*/
|
||||
// dbg!("start of cli");
|
||||
loop {
|
||||
if !cli_oneshot.is_empty() {
|
||||
match cli_oneshot.await {
|
||||
Ok(config_from_cli) => {
|
||||
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);
|
||||
let _ = to_local_tx.send(true);
|
||||
return Some(config_from_cli)
|
||||
},
|
||||
_ => return None,
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_millis(300)).await;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
async fn export_saved_config_data_locally(
|
||||
config_file_path: &PathBuf,
|
||||
current_config: &Processes
|
||||
) -> anyhow::Result<()> {
|
||||
|
||||
let mut file = File::create(config_file_path)?;
|
||||
file.write_all(
|
||||
serde_json::to_string_pretty(current_config)?.as_bytes()
|
||||
)?;
|
||||
Ok(())
|
||||
// Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// # Fn `load_processes`
|
||||
/// ## for reading and parsing *local* storing config
|
||||
///
|
||||
|
|
@ -54,14 +438,14 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
|
|||
error!("Invalid character in config file. Config path was set to default");
|
||||
"settings.json"
|
||||
});
|
||||
info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url);
|
||||
info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url);
|
||||
match load_processes(config_path) {
|
||||
Some(local_conf) => {
|
||||
info!(
|
||||
"Found local configuration, version - {}",
|
||||
&local_conf.date_of_creation
|
||||
);
|
||||
if !params.no_remote_config {
|
||||
if !params.no_sub {
|
||||
if let Some(remote_conf) =
|
||||
// TODO : rework with pubsub mech
|
||||
once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url))
|
||||
|
|
@ -85,7 +469,7 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
|
|||
}
|
||||
None => {
|
||||
warn!("No local valid conf was found. Trying to pull remote one...");
|
||||
if !params.no_remote_config {
|
||||
if !params.no_sub {
|
||||
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url)));
|
||||
if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
|
||||
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
|
||||
|
|
@ -322,7 +706,7 @@ fn restart_main_thread() -> std::io::Result<()> {
|
|||
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
|
||||
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
|
||||
|
||||
if params.no_sub || params.no_remote_config {
|
||||
if params.no_sub {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) {
|
||||
|
|
@ -397,6 +781,9 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<Pr
|
|||
/// *depends on* : `Processes`, `ConfigActuality`
|
||||
///
|
||||
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
||||
if local.is_default() {
|
||||
return ConfigActuality::Remote;
|
||||
}
|
||||
let local_date: u64 = local.date_of_creation.parse().unwrap();
|
||||
let remote_date: u64 = remote.date_of_creation.parse().unwrap();
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ enum EnvVars {
|
|||
NoxisNoHagent,
|
||||
NoxisNoLogs,
|
||||
NoxisRefreshLogs,
|
||||
NoxisNoRemoteConfig,
|
||||
// NoxisNoRemoteConfig,
|
||||
NoxisNoConfigSub,
|
||||
NoxisSocketPath,
|
||||
NoxisLogTo,
|
||||
|
|
@ -29,7 +29,7 @@ impl std::fmt::Display for EnvVars {
|
|||
EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"),
|
||||
EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"),
|
||||
EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"),
|
||||
EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"),
|
||||
// EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"),
|
||||
EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"),
|
||||
EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"),
|
||||
EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"),
|
||||
|
|
@ -48,7 +48,7 @@ impl<'a> EnvVars {
|
|||
EnvVars::NoxisNoHagent => "false",
|
||||
EnvVars::NoxisNoLogs => "false",
|
||||
EnvVars::NoxisRefreshLogs => "false",
|
||||
EnvVars::NoxisNoRemoteConfig => "false",
|
||||
// EnvVars::NoxisNoRemoteConfig => "false",
|
||||
EnvVars::NoxisNoConfigSub => "false",
|
||||
EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock",
|
||||
EnvVars::NoxisLogTo => "./",
|
||||
|
|
@ -77,7 +77,7 @@ impl<'a> EnvVars {
|
|||
Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string());
|
||||
Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string());
|
||||
Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string());
|
||||
Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string());
|
||||
// Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string());
|
||||
Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string());
|
||||
Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap());
|
||||
Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap());
|
||||
|
|
@ -147,12 +147,6 @@ impl std::fmt::Display for MetricsPrebootParams {
|
|||
/// noxis-rs ... --refresh-logs ...
|
||||
/// ```
|
||||
///
|
||||
/// `--no-remote-config` - to disable work with Redis as config producer
|
||||
/// ### usage :
|
||||
/// ``` bash
|
||||
/// noxis-rs ... --no-remote-config ...
|
||||
/// ```
|
||||
///
|
||||
/// `--no-sub` - to disable Redis subscribtion mechanism
|
||||
/// ### usage :
|
||||
/// ``` bash
|
||||
|
|
@ -212,17 +206,18 @@ pub struct PrebootParams {
|
|||
help="To clear logs directory"
|
||||
)]
|
||||
pub refresh_logs : bool,
|
||||
#[arg(
|
||||
long = "no-remote-config",
|
||||
action,
|
||||
help="To disable work with remote config server",
|
||||
conflicts_with="no_sub")]
|
||||
pub no_remote_config : bool,
|
||||
// #[arg(
|
||||
// long = "no-remote-config",
|
||||
// action,
|
||||
// help="To disable work with remote config server",
|
||||
// conflicts_with="no_sub")]
|
||||
// pub no_remote_config : bool,
|
||||
#[arg(
|
||||
long = "no-sub",
|
||||
action,
|
||||
help="To disable subscription mechanism",
|
||||
conflicts_with="no_remote_config")]
|
||||
help="To disable Redis subscription mechanism",
|
||||
)]
|
||||
// conflicts_with="no_remote_config"
|
||||
pub no_sub : bool,
|
||||
|
||||
// params (socket_path, log_to, remote_server_url, config)
|
||||
|
|
@ -243,7 +238,7 @@ pub struct PrebootParams {
|
|||
#[arg(
|
||||
long = "remote-server-url",
|
||||
default_value="localhost",
|
||||
conflicts_with="no_remote_config",
|
||||
conflicts_with="no_sub",
|
||||
help = "To set url of remote config server using in remote config pulling mechanism"
|
||||
)]
|
||||
pub remote_server_url : String,
|
||||
|
|
@ -288,15 +283,17 @@ impl PrebootParams {
|
|||
// existing log dir
|
||||
if !self.log_to.exists() && !self.no_logs {
|
||||
eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default");
|
||||
self.refresh_logs = false;
|
||||
self.log_to = PathBuf::from("./");
|
||||
// return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start"));
|
||||
}
|
||||
// existing sock file
|
||||
if !self.config.exists() {
|
||||
eprintln!("Error: Invalid character in config file. Config path was set to default");
|
||||
let config = PathBuf::from("/etc/settings.json");
|
||||
if !config.exists() && self.no_remote_config {
|
||||
return Err(Error::msg("Noxis cannot run without config. Create local config or enable remote-config mechanism"));
|
||||
// TODO : ??? wtf is going with 2 paths
|
||||
let config = PathBuf::from("/etc/enode/noxis/settings.json");
|
||||
if !config.exists() && self.no_sub {
|
||||
return Err(Error::msg("Noxis cannot run without config. Create local config or enable pubsub mechanism"));
|
||||
}
|
||||
self.config = PathBuf::from("settings.json");
|
||||
// return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start"));
|
||||
|
|
@ -353,20 +350,20 @@ mod preboot_unitests{
|
|||
"runner-rs",
|
||||
"--no-sub",
|
||||
"--remote-server-url", "redis://127.0.0.1"
|
||||
]).is_ok())
|
||||
}
|
||||
#[test]
|
||||
fn parsing_config_invalid_args_noremote_nosub() {
|
||||
assert!(PrebootParams::try_parse_from(vec![
|
||||
"runner-rs",
|
||||
"--no-remote-config", "--no-sub"
|
||||
]).is_err())
|
||||
}
|
||||
// #[test]
|
||||
// fn parsing_config_invalid_args_noremote_nosub() {
|
||||
// assert!(PrebootParams::try_parse_from(vec![
|
||||
// "runner-rs",
|
||||
// "--no-remote-config", "--no-sub"
|
||||
// ]).is_err())
|
||||
// }
|
||||
#[test]
|
||||
fn parsing_config_invalid_args_noremote_remoteurl() {
|
||||
assert!(PrebootParams::try_parse_from(vec![
|
||||
"runner-rs",
|
||||
"--no-remote-config",
|
||||
"--no-sub",
|
||||
"--remote-server-url", "redis://127.0.0.1"
|
||||
]).is_err())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ type SendersVec = Arc<Vec<Arc<mpsc::Sender<u8>>>>;
|
|||
///
|
||||
/// *depends on* : Sig, Signals
|
||||
///
|
||||
pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError> {
|
||||
pub async fn set_valid_destructor(senders: SendersVec) -> anyhow::Result<()> {
|
||||
let (mut int, mut term, mut stop) = (
|
||||
Sig::new(Signals::Sigint, senders.clone()),
|
||||
Sig::new(Signals::Sigterm, senders.clone()),
|
||||
|
|
|
|||
|
|
@ -2,7 +2,114 @@
|
|||
|
||||
use std::net::Ipv4Addr;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use async_trait::async_trait;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DependencyType {
|
||||
File,
|
||||
Service,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ServiceState {
|
||||
Ok,
|
||||
Unavailable
|
||||
}
|
||||
pub struct ServiceWaitConfig(u32);
|
||||
|
||||
impl Default for ServiceWaitConfig {
|
||||
fn default() -> Self {
|
||||
Self(5)
|
||||
}
|
||||
}
|
||||
|
||||
pub enum FileTriggerType {
|
||||
OnChange,
|
||||
OnDelete,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for FileTriggerType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
return match self {
|
||||
FileTriggerType::OnChange => write!(f, "File was changed"),
|
||||
FileTriggerType::OnDelete => write!(f, "File was moved or deleted"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> FileTriggerType {
|
||||
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
|
||||
return match self {
|
||||
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
|
||||
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger)),
|
||||
}
|
||||
}
|
||||
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
|
||||
return match self {
|
||||
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
|
||||
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger.on_delete.clone())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Triggers {
|
||||
File { on_change: Arc<str>, on_delete: Arc<str> },
|
||||
Service {on_lost: Arc<str>, wait: u32},
|
||||
}
|
||||
|
||||
impl Triggers {
|
||||
pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers {
|
||||
Triggers::File { on_change, on_delete }
|
||||
}
|
||||
pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers {
|
||||
Triggers::Service{on_lost, wait: wait_time}
|
||||
}
|
||||
pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> {
|
||||
if let Triggers::Service { on_lost, .. } = self {
|
||||
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone())))
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FileTriggersForController{ pub on_change: Arc<str>, pub on_delete: Arc<str> }
|
||||
pub struct ServiceTriggersForController(Arc<str>);
|
||||
|
||||
impl std::fmt::Display for DependencyType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
return match self {
|
||||
DependencyType::File => write!(f, "File"),
|
||||
DependencyType::Service => write!(f, "Service"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ProcessState {
|
||||
Pending,
|
||||
Holding,
|
||||
Stopped,
|
||||
StoppedByCli,
|
||||
}
|
||||
#[derive(Debug)]
|
||||
pub enum Events {
|
||||
Positive(Arc<str>),
|
||||
Negative(NegativeOutcomes)
|
||||
}
|
||||
#[derive(Debug)]
|
||||
pub enum NegativeOutcomes {
|
||||
FileWasChanged(Arc<str>, DependencyType, Arc<str>),
|
||||
FileWasMovedOrDeleted(Arc<str>, DependencyType, Arc<str>),
|
||||
ServiceIsUnreachable(Arc<str>, DependencyType, Arc<str>),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ProcessUnit {
|
||||
async fn process(&mut self);
|
||||
}
|
||||
/// # an Error enum (next will be deleted and replaced)
|
||||
pub enum CustomError {
|
||||
Fatal,
|
||||
|
|
@ -40,6 +147,22 @@ pub struct Processes {
|
|||
pub processes: Vec<TrackingProcess>,
|
||||
}
|
||||
|
||||
impl Default for Processes {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
date_of_creation : String::new(),
|
||||
config_server : String::from("default"),
|
||||
processes : Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Processes {
|
||||
pub fn is_default(&self) -> bool {
|
||||
self.date_of_creation.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// # Struct for the 2nd level in json conf file
|
||||
/// ## for each process to contain info, such as name, path and dependencies
|
||||
///
|
||||
|
|
@ -135,7 +258,7 @@ pub struct Files {
|
|||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Services {
|
||||
pub hostname: String,
|
||||
pub port: u32,
|
||||
pub port: Option<u32>,
|
||||
pub triggers: ServiceTriggers,
|
||||
}
|
||||
|
||||
|
|
@ -159,7 +282,6 @@ pub struct Services {
|
|||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct ServiceTriggers {
|
||||
pub wait: u32,
|
||||
pub delay: u32,
|
||||
#[serde(rename = "onLost")]
|
||||
pub on_lost: String,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,25 +6,207 @@ pub mod services;
|
|||
|
||||
// TODO : saving current flags state
|
||||
|
||||
use crate::options::structs::CustomError;
|
||||
use crate::options::structs::TrackingProcess;
|
||||
use files::create_watcher;
|
||||
use files::file_handler;
|
||||
use inotify::Inotify;
|
||||
use log::{error, warn};
|
||||
use crate::options::structs::{CustomError, TrackingProcess, Processes};
|
||||
// use files::create_watcher;
|
||||
// use files::file_handler;
|
||||
// use inotify::Inotify;
|
||||
use log::{error, warn, info};
|
||||
use prcs::{
|
||||
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
|
||||
unfreeze_process,
|
||||
};
|
||||
use services::service_handler;
|
||||
// use services::service_handler;
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
use tokio::join;
|
||||
// use tokio::join;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Duration;
|
||||
// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender};
|
||||
// controllers import
|
||||
use prcs::v2::ProcessesController;
|
||||
use files::v2::FilesController;
|
||||
use services::v2::ServicesController;
|
||||
use async_trait::async_trait;
|
||||
|
||||
const GET_ID_CMD: &str = "hostname";
|
||||
|
||||
pub mod v2 {
|
||||
use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
|
||||
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ControllerResult {
|
||||
Process(Option<ProcessesController>),
|
||||
File(Option<FilesController>),
|
||||
Service(Option<ServicesController>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Supervisor {
|
||||
prcs : LinkedList<ProcessesController>,
|
||||
files : LinkedList<FilesController>,
|
||||
services : LinkedList<ServicesController>,
|
||||
}
|
||||
|
||||
impl Supervisor {
|
||||
pub fn new() -> Supervisor {
|
||||
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
|
||||
}
|
||||
pub async fn with_config(mut self, config: &Processes) -> Supervisor {
|
||||
let _ = config.processes.iter()
|
||||
.for_each(|prc| {
|
||||
let (rx, tx) = mpsc::channel::<Events>(10);
|
||||
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
|
||||
if !self.prcs.contains(&temp) {
|
||||
self.prcs.push_back(temp);
|
||||
}
|
||||
let rx = Arc::new(rx);
|
||||
let proc_name: Arc<str> = Arc::from(prc.name.clone());
|
||||
|
||||
let _ = prc.dependencies.files.iter()
|
||||
.for_each(|file| {
|
||||
let mut hm = HashMap::new();
|
||||
let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())};
|
||||
hm.insert(proc_name.clone(), (triggers, rx.clone()));
|
||||
|
||||
let tempfile = FilesController::new(&file.filename.as_str(), hm)
|
||||
.with_path(&file.src);
|
||||
|
||||
|
||||
if let Ok(file) = tempfile {
|
||||
if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) {
|
||||
current_file.add_event(file);
|
||||
} else {
|
||||
self.files.push_back(file);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// servs
|
||||
let _ = prc.dependencies.services.iter()
|
||||
.for_each(|serv| {
|
||||
let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref());
|
||||
// preparations
|
||||
let rx = rx.clone();
|
||||
let serv_cont = ServicesController::new().with_access_name(
|
||||
&serv.hostname,
|
||||
&access_url
|
||||
);
|
||||
// triggers
|
||||
let arc: Arc<str> = Arc::from(serv.triggers.on_lost.clone());
|
||||
let triggers = Triggers::new_service(arc, serv.triggers.wait);
|
||||
|
||||
if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) {
|
||||
proc.add_process(&prc.name, triggers, rx);
|
||||
} else {
|
||||
// vecdeque for queue
|
||||
let mut vec: VecDeque<Arc<str>> = VecDeque::new();
|
||||
vec.push_back(proc_name.clone());
|
||||
// connection_queue
|
||||
let mut connection_queue: BTreeMap<u32, VecDeque<Arc<str>>> = BTreeMap::new();
|
||||
connection_queue.insert(serv.triggers.wait, vec);
|
||||
// event_reg
|
||||
let mut hm = HashMap::new();
|
||||
hm.insert(proc_name.clone(), (triggers, rx));
|
||||
|
||||
let serv_cont = serv_cont.with_params(connection_queue, hm);
|
||||
self.services.push_back(serv_cont);
|
||||
}
|
||||
});
|
||||
});
|
||||
self
|
||||
}
|
||||
pub fn get_stats(&self) -> String {
|
||||
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProcessUnit for Supervisor {
|
||||
async fn process(&mut self) {
|
||||
info!("Initializing monitoring ...");
|
||||
loop {
|
||||
// dbg!(&self);
|
||||
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
|
||||
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
|
||||
// let res = tokio::join!(prc.process(), file.process(), serv.process());
|
||||
if let Some(mut val) = self.prcs.pop_front() {
|
||||
tasks.push(
|
||||
tokio::spawn( async move {
|
||||
val.process().await;
|
||||
ControllerResult::Process(Some(val))
|
||||
})
|
||||
);
|
||||
}
|
||||
if let Some(mut val) = self.files.pop_front() {
|
||||
tasks.push(
|
||||
tokio::spawn( async move {
|
||||
val.process().await;
|
||||
ControllerResult::File(Some(val))
|
||||
})
|
||||
);
|
||||
}
|
||||
if let Some(mut val) = self.services.pop_front() {
|
||||
tasks.push(
|
||||
tokio::spawn( async move {
|
||||
val.process().await;
|
||||
ControllerResult::Service(Some(val))
|
||||
})
|
||||
);
|
||||
}
|
||||
for task in tasks {
|
||||
match task.await {
|
||||
Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val),
|
||||
Ok(ControllerResult::File(Some(val))) => self.files.push_back(val),
|
||||
Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val),
|
||||
Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."),
|
||||
_ => { /* DEAD END (CAN NOT BE EXECUTED) */},
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// spawn tasks
|
||||
// spawn prc
|
||||
// spawn files
|
||||
// spawn services
|
||||
// ## for ... i.await in loop
|
||||
pub async fn init_monitoring(
|
||||
config: Processes
|
||||
) -> anyhow::Result<()> {
|
||||
let mut supervisor = Supervisor::new().with_config(&config).await;
|
||||
info!("Monitoring: {} ", &supervisor.get_stats());
|
||||
supervisor.process().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
// async fn generate_controllers<'a>(config: Processes) -> (HashSet<ProcessesController<'a>>, HashSet<FilesController<'a>>, HashSet<ServicesController<'a>>) {
|
||||
// let mut prcs: HashSet<ProcessesController<'a>> = HashSet::new();
|
||||
// let mut files: HashSet<FilesController<'a>> = HashSet::new();
|
||||
// let mut services: HashSet<ServicesController<'a>> = HashSet::new();
|
||||
// for prc in config.processes {
|
||||
// let (rx, tx) = mpsc::channel::<Events<'a>>(10);
|
||||
// // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path);
|
||||
// let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path);
|
||||
// let a = new_prc.process().await;
|
||||
|
||||
// }
|
||||
// (prcs, files, services)
|
||||
// }
|
||||
// spawn prc check with semaphore check
|
||||
async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) }
|
||||
|
||||
// spawn file check with semaphore check
|
||||
async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) }
|
||||
|
||||
// spawn service check with semaphore check
|
||||
async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) }
|
||||
}
|
||||
|
||||
/// # Fn `run_daemons`
|
||||
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
|
||||
///
|
||||
|
|
@ -40,37 +222,37 @@ const GET_ID_CMD: &str = "hostname";
|
|||
///
|
||||
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
|
||||
///
|
||||
pub async fn run_daemons(
|
||||
proc: Arc<TrackingProcess>,
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
rx: &mut mpsc::Receiver<u8>,
|
||||
) {
|
||||
// creating watchers + ---buffers---
|
||||
let mut watchers: Vec<Inotify> = vec![];
|
||||
for file in proc.dependencies.files.clone().into_iter() {
|
||||
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
||||
watchers.push(watcher);
|
||||
} else {
|
||||
let _ = tx.send(121).await;
|
||||
}
|
||||
// watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
|
||||
}
|
||||
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
|
||||
Arc::new(tokio::sync::Mutex::new(watchers));
|
||||
// pub async fn run_daemons(
|
||||
// proc: Arc<TrackingProcess>,
|
||||
// tx: Arc<mpsc::Sender<u8>>,
|
||||
// rx: &mut mpsc::Receiver<u8>,
|
||||
// ) {
|
||||
// // creating watchers + ---buffers---
|
||||
// let mut watchers: Vec<Inotify> = vec![];
|
||||
// for file in proc.dependencies.files.clone().into_iter() {
|
||||
// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
||||
// watchers.push(watcher);
|
||||
// } else {
|
||||
// let _ = tx.send(121).await;
|
||||
// }
|
||||
// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
|
||||
// }
|
||||
// let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
|
||||
// Arc::new(tokio::sync::Mutex::new(watchers));
|
||||
|
||||
loop {
|
||||
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
||||
tokio::select! {
|
||||
_ = run_hand => continue,
|
||||
_val = rx.recv() => {
|
||||
if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
|
||||
return;
|
||||
}
|
||||
},
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
// loop {
|
||||
// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
||||
// tokio::select! {
|
||||
// _ = run_hand => continue,
|
||||
// _val = rx.recv() => {
|
||||
// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
|
||||
// return;
|
||||
// }
|
||||
// },
|
||||
// }
|
||||
// tokio::task::yield_now().await;
|
||||
// }
|
||||
// }
|
||||
|
||||
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
|
||||
match val {
|
||||
|
|
@ -133,7 +315,6 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
|
|||
},
|
||||
// // 9 - File-dependency change -> staying (after check)
|
||||
9 => {
|
||||
// no need to trash logs
|
||||
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
},
|
||||
|
|
@ -201,36 +382,36 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
|
|||
///
|
||||
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
|
||||
///
|
||||
pub async fn running_handler(
|
||||
prc: Arc<TrackingProcess>,
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
||||
) {
|
||||
// services and files check (once)
|
||||
let files_check = file_handler(
|
||||
&prc.name,
|
||||
&prc.dependencies.files,
|
||||
tx.clone(),
|
||||
watchers.clone(),
|
||||
);
|
||||
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
|
||||
// pub async fn running_handler(
|
||||
// prc: Arc<TrackingProcess>,
|
||||
// tx: Arc<mpsc::Sender<u8>>,
|
||||
// watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
||||
// ) {
|
||||
// // services and files check (once)
|
||||
// let files_check = file_handler(
|
||||
// &prc.name,
|
||||
// &prc.dependencies.files,
|
||||
// tx.clone(),
|
||||
// watchers.clone(),
|
||||
// );
|
||||
// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
|
||||
|
||||
let res = join!(files_check, services_check);
|
||||
// if inactive -> spawn checks -> active is true
|
||||
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
||||
if start_process(&prc.name, &prc.path).await.is_err() {
|
||||
tx.send(3).await.unwrap();
|
||||
return;
|
||||
}
|
||||
}
|
||||
// if frozen -> spawn checks -> unfreeze is true
|
||||
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
||||
tx.send(10).await.unwrap();
|
||||
return;
|
||||
}
|
||||
// tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
// let res = join!(files_check, services_check);
|
||||
// // if inactive -> spawn checks -> active is true
|
||||
// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
||||
// if start_process(&prc.name, &prc.path).await.is_err() {
|
||||
// tx.send(3).await.unwrap();
|
||||
// return;
|
||||
// }
|
||||
// }
|
||||
// // if frozen -> spawn checks -> unfreeze is true
|
||||
// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
||||
// tx.send(10).await.unwrap();
|
||||
// return;
|
||||
// }
|
||||
// // tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
// tokio::task::yield_now().await;
|
||||
// }
|
||||
|
||||
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
|
||||
/// # Fn `get_container_id`
|
||||
|
|
|
|||
|
|
@ -1,183 +1,321 @@
|
|||
use crate::options::structs::{CustomError, Files};
|
||||
use super::prcs::{is_active, is_frozen};
|
||||
use inotify::{EventMask, Inotify, WatchMask};
|
||||
use std::borrow::BorrowMut;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Duration;
|
||||
use crate::options::structs::{CustomError, Files};
|
||||
use super::prcs::{is_active, is_frozen};
|
||||
use inotify::{EventMask, Inotify, WatchMask};
|
||||
use std::borrow::BorrowMut;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Sender as Sender;
|
||||
use tokio::time::Duration;
|
||||
use crate::options::structs::Events;
|
||||
use async_trait::async_trait;
|
||||
|
||||
/// # Fn `create_watcher`
|
||||
/// ## for creating watcher on file's delete | update events
|
||||
///
|
||||
/// *input* : `&str`, `&str`
|
||||
///
|
||||
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
|
||||
///
|
||||
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
|
||||
///
|
||||
/// *managing* : current file's name: &str, path in local storage to current file: &str
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
pub async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {
|
||||
let src = format!("{}{}", path, filename);
|
||||
let inotify: Inotify = Inotify::init()?;
|
||||
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
|
||||
Ok(inotify)
|
||||
}
|
||||
pub mod v2 {
|
||||
use log::{error, info, warn};
|
||||
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
|
||||
use super::*;
|
||||
use std::{collections::HashMap, path::Path};
|
||||
|
||||
/// # Fn `create_watcher`
|
||||
/// ## for managing processes by checking dep files' states
|
||||
///
|
||||
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||
///
|
||||
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
|
||||
///
|
||||
/// *initiator* : fn `utils::running_handler`
|
||||
///
|
||||
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||
///
|
||||
/// *depends on* : Files
|
||||
///
|
||||
pub async fn file_handler(
|
||||
name: &str,
|
||||
files: &[Files],
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
||||
) -> Result<(), CustomError> {
|
||||
for (i, file) in files.iter().enumerate() {
|
||||
// let src = format!("{}{}", file.src, file.filename);
|
||||
if check_file(&file.filename, &file.src).await.is_err() {
|
||||
if !is_active(name).await || is_frozen(name).await {
|
||||
return Err(CustomError::Fatal);
|
||||
type MpscSender = Arc<Sender<Events>>;
|
||||
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum FileState {
|
||||
Ok,
|
||||
NotFound,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FilesController {
|
||||
name : Arc<str>,
|
||||
path : String,
|
||||
code_name : Arc<str>,
|
||||
state : FileState,
|
||||
watcher : Option<Inotify>,
|
||||
triggers : EventHandlers,
|
||||
}
|
||||
|
||||
impl PartialEq for FilesController {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.code_name == other.code_name
|
||||
}
|
||||
match file.triggers.on_delete.as_str() {
|
||||
"stay" => {
|
||||
tx.send(9).await.unwrap();
|
||||
continue;
|
||||
}
|
||||
"stop" => {
|
||||
if is_active(name).await {
|
||||
tx.send(1).await.unwrap();
|
||||
}
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
"hold" => {
|
||||
if is_active(name).await {
|
||||
tx.send(2).await.unwrap();
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
tx.send(101).await.unwrap();
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
|
||||
impl FilesController {
|
||||
pub fn new(name: &str, triggers: EventHandlers) -> FilesController {
|
||||
let name: Arc<str> = Arc::from(name);
|
||||
Self {
|
||||
name : name.clone(),
|
||||
path : String::new(),
|
||||
state : FileState::Ok,
|
||||
watcher : None,
|
||||
triggers,
|
||||
code_name : name.clone(),
|
||||
}
|
||||
}
|
||||
} else if is_active(name).await && !is_frozen(name).await {
|
||||
let watchers = watchers.clone();
|
||||
// println!("mutex: {:?}", watchers);
|
||||
let mut buffer = [0; 128];
|
||||
let mut mutex_guard = watchers.lock().await;
|
||||
if let Some(notify) = mutex_guard.get_mut(i) {
|
||||
let events = notify.read_events(&mut buffer);
|
||||
// println!("{:?}", events);
|
||||
if events.is_ok() {
|
||||
let events: Vec<EventMask> = events
|
||||
.unwrap()
|
||||
.map(|mask| mask.mask)
|
||||
.filter(|mask| {
|
||||
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
||||
})
|
||||
.collect();
|
||||
for event in events {
|
||||
if let EventMask::DELETE_SELF = event {
|
||||
// ! warning (DELETE_SELF event) !
|
||||
// println!("! warning (DELETE_SELF event) !");
|
||||
// * watcher recreation after dealing with file recreation mechanism in text editors
|
||||
let mutex = notify.borrow_mut();
|
||||
|
||||
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
||||
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
||||
*mutex = watcher;
|
||||
}
|
||||
pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController> {
|
||||
self.path = path.as_ref().to_string_lossy().into_owned();
|
||||
self.watcher = {
|
||||
match create_watcher(&self.name, &self.path) {
|
||||
Ok(val) => Some(val),
|
||||
Err(er) => {
|
||||
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
|
||||
return Err(er)
|
||||
}
|
||||
match file.triggers.on_change.as_str() {
|
||||
"stop" => {
|
||||
let _ = tx.send(7).await;
|
||||
}
|
||||
};
|
||||
self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name));
|
||||
Ok(self)
|
||||
}
|
||||
pub fn add_event(&mut self, file_controller : FilesController) {
|
||||
for (k, v) in file_controller.triggers {
|
||||
self.triggers.entry(k).or_insert(v);
|
||||
}
|
||||
}
|
||||
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
|
||||
for (prc_name, (triggers, channel)) in &self.triggers {
|
||||
let msg = match &trigger_type {
|
||||
None => {
|
||||
Events::Positive(self.code_name.clone())
|
||||
},
|
||||
Some(event) => {
|
||||
info!("Event on file {} ({}) : {}. Notifying `{}` ...", &self.name, &self.path, event, &prc_name);
|
||||
event.event_from_file_trigger_controller(self.code_name.clone(), &triggers)
|
||||
},
|
||||
};
|
||||
let _ = channel.send(msg).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
#[async_trait]
|
||||
impl ProcessUnit for FilesController {
|
||||
async fn process(&mut self) {
|
||||
// polling file check
|
||||
// 1) existing check
|
||||
// dbg!(&self);
|
||||
if let Ok(_) = check_file(&self.name, &self.path).await {
|
||||
if let FileState::NotFound = self.state {
|
||||
info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name);
|
||||
self.state = FileState::Ok;
|
||||
// reseting negative outcome in prc
|
||||
self.trigger_on(None).await;
|
||||
}
|
||||
match &mut self.watcher {
|
||||
Some(notify) => {
|
||||
let mut buffer = [0; 1024];
|
||||
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
|
||||
// notif_events.into_iter().for_each(|mask| {dbg!(&mask.mask);});
|
||||
// todo!();
|
||||
if let (recreate_watcher, true) = (
|
||||
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
|
||||
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
|
||||
) {
|
||||
warn!("File {} ({}) was changed", self.name, &self.path);
|
||||
if recreate_watcher {
|
||||
self.watcher = match create_watcher(&self.name, &self.path) {
|
||||
Ok(notifier) => Some(notifier),
|
||||
Err(er) => {
|
||||
error!("Failed to recreate watcher for {} ({}) due to {}",
|
||||
self.name,
|
||||
&self.path,
|
||||
er
|
||||
);
|
||||
None
|
||||
},
|
||||
}
|
||||
}
|
||||
self.trigger_on(Some(FileTriggerType::OnChange)).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
"restart" => {
|
||||
let _ = tx.send(8).await;
|
||||
},
|
||||
None => { /* DEAD END */},
|
||||
}
|
||||
} else {
|
||||
if let FileState::Ok = self.state {
|
||||
warn!("File {} ({}) was not found in determined scope", self.name, &self.path);
|
||||
self.state = FileState::NotFound;
|
||||
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
self.trigger_on(None).await;
|
||||
// 2) change check
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `create_watcher`
|
||||
/// ## for creating watcher on file's delete | update events
|
||||
///
|
||||
/// *input* : `&str`, `&str`
|
||||
///
|
||||
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
|
||||
///
|
||||
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
|
||||
///
|
||||
/// *managing* : current file's name: &str, path in local storage to current file: &str
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
|
||||
let src = format!("{}{}", path, filename);
|
||||
let inotify: Inotify = Inotify::init()?;
|
||||
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
|
||||
Ok(inotify)
|
||||
}
|
||||
|
||||
/// # Fn `create_watcher`
|
||||
/// ## for managing processes by checking dep files' states
|
||||
///
|
||||
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||
///
|
||||
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
|
||||
///
|
||||
/// *initiator* : fn `utils::running_handler`
|
||||
///
|
||||
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||
///
|
||||
/// *depends on* : Files
|
||||
///
|
||||
pub async fn file_handler(
|
||||
name: &str,
|
||||
files: &[Files],
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
for (i, file) in files.iter().enumerate() {
|
||||
// let src = format!("{}{}", file.src, file.filename);
|
||||
if check_file(&file.filename, &file.src).await.is_err() {
|
||||
if !is_active(name).await || is_frozen(name).await {
|
||||
return Err(anyhow::Error::msg("Process is frozen or stopped"));
|
||||
}
|
||||
match file.triggers.on_delete.as_str() {
|
||||
"stay" => {
|
||||
tx.send(9).await.unwrap();
|
||||
continue;
|
||||
}
|
||||
"stop" => {
|
||||
if is_active(name).await {
|
||||
tx.send(1).await.unwrap();
|
||||
}
|
||||
return Err(anyhow::Error::msg("Process was stopped"));
|
||||
}
|
||||
"hold" => {
|
||||
if is_active(name).await {
|
||||
tx.send(2).await.unwrap();
|
||||
return Err(anyhow::Error::msg("Process was frozen"));
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
tx.send(101).await.unwrap();
|
||||
return Err(anyhow::Error::msg("Impermissible character or word in file trigger"));
|
||||
}
|
||||
}
|
||||
} else if is_active(name).await && !is_frozen(name).await {
|
||||
let watchers = watchers.clone();
|
||||
// println!("mutex: {:?}", watchers);
|
||||
let mut buffer = [0; 128];
|
||||
let mut mutex_guard = watchers.lock().await;
|
||||
if let Some(notify) = mutex_guard.get_mut(i) {
|
||||
let events = notify.read_events(&mut buffer);
|
||||
// println!("{:?}", events);
|
||||
if events.is_ok() {
|
||||
let events: Vec<EventMask> = events
|
||||
.unwrap()
|
||||
.map(|mask| mask.mask)
|
||||
.filter(|mask| {
|
||||
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
||||
})
|
||||
.collect();
|
||||
for event in events {
|
||||
if let EventMask::DELETE_SELF = event {
|
||||
// ! warning (DELETE_SELF event) !
|
||||
// println!("! warning (DELETE_SELF event) !");
|
||||
// * watcher recreation after dealing with file recreation mechanism in text editors
|
||||
let mutex = notify.borrow_mut();
|
||||
|
||||
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
||||
if let Ok(watcher) = create_watcher(&file.filename, &file.src) {
|
||||
*mutex = watcher;
|
||||
}
|
||||
}
|
||||
"stay" => {
|
||||
let _ = tx.send(9).await;
|
||||
}
|
||||
_ => {
|
||||
let _ = tx.send(101).await;
|
||||
match file.triggers.on_change.as_str() {
|
||||
"stop" => {
|
||||
let _ = tx.send(7).await;
|
||||
}
|
||||
"restart" => {
|
||||
let _ = tx.send(8).await;
|
||||
}
|
||||
"stay" => {
|
||||
let _ = tx.send(9).await;
|
||||
}
|
||||
_ => {
|
||||
let _ = tx.send(101).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
Ok(())
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// # Fn `check_file`
|
||||
/// ## for checking existance of current file
|
||||
///
|
||||
/// *input* : `&str`, `&str`
|
||||
///
|
||||
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
|
||||
///
|
||||
/// *initiator* : fn `file_handler`
|
||||
///
|
||||
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
|
||||
///
|
||||
/// *depends on* : network activity
|
||||
///
|
||||
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
|
||||
let arc_name = Arc::new(filename.to_string());
|
||||
let arc_path = Arc::new(path.to_string());
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let file_concat = format!("{}{}", arc_path, arc_name);
|
||||
let path = Path::new(&file_concat);
|
||||
if path.exists() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(CustomError::Fatal)
|
||||
/// # Fn `check_file`
|
||||
/// ## for checking existance of current file
|
||||
///
|
||||
/// *input* : `&str`, `&str`
|
||||
///
|
||||
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
|
||||
///
|
||||
/// *initiator* : fn `file_handler`
|
||||
///
|
||||
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
|
||||
///
|
||||
/// *depends on* : network activity
|
||||
///
|
||||
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
|
||||
let arc_name = Arc::new(filename.to_string());
|
||||
let arc_path = Arc::new(path.to_string());
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let file_concat = format!("{}{}", arc_path, arc_name);
|
||||
let path = Path::new(&file_concat);
|
||||
if path.exists() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(CustomError::Fatal)
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| {
|
||||
panic!("Corrupted while file check process");
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod files_unittests {
|
||||
use super::*;
|
||||
#[tokio::test]
|
||||
async fn try_to_create_watcher() {
|
||||
let res = create_watcher("dep-file", "./tests/examples/");
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn try_to_create_invalid_watcher() {
|
||||
let res = create_watcher("invalid-file", "/path/to/the/no/dir");
|
||||
assert!(res.is_err());
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn check_existing_file() {
|
||||
let res = check_file("dep-file", "./tests/examples/").await;
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn check_non_existing_file() {
|
||||
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
|
||||
assert!(res.is_err());
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| {
|
||||
panic!("Corrupted while file check process");
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod files_unittests {
|
||||
use super::*;
|
||||
#[tokio::test]
|
||||
async fn try_to_create_watcher() {
|
||||
let res = create_watcher("dep-file", "./tests/examples/").await;
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn try_to_create_invalid_watcher() {
|
||||
let res = create_watcher("invalid-file", "/path/to/the/no/dir").await;
|
||||
assert!(res.is_err());
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn check_existing_file() {
|
||||
let res = check_file("dep-file", "./tests/examples/").await;
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn check_non_existing_file() {
|
||||
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
|
||||
assert!(res.is_err());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,8 +1,132 @@
|
|||
use crate::options::structs::CustomError;
|
||||
use log::{error, warn};
|
||||
use std::process::{Command, Output};
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Duration;
|
||||
use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit};
|
||||
use std::collections::HashSet;
|
||||
use tokio::sync::mpsc::Receiver as MpscReciever;
|
||||
use async_trait::async_trait;
|
||||
|
||||
pub mod v2 {
|
||||
use log::info;
|
||||
use crate::options::structs::DependencyType;
|
||||
use std::path::Path;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ProcessesController {
|
||||
name: Arc<str>,
|
||||
bin: String,
|
||||
// obj: Arc<TrackingProcess>,
|
||||
state: ProcessState,
|
||||
event_reader: MpscReciever<Events>,
|
||||
negative_events: HashSet<Arc<str>>,
|
||||
}
|
||||
|
||||
impl PartialEq for ProcessesController {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.bin == other.bin
|
||||
}
|
||||
}
|
||||
|
||||
impl ProcessesController {
|
||||
pub fn new(name: &str, event_reader: MpscReciever<Events>) -> ProcessesController {
|
||||
ProcessesController {
|
||||
name : Arc::from(name),
|
||||
bin : String::new(),
|
||||
state : ProcessState::Stopped,
|
||||
event_reader,
|
||||
negative_events : HashSet::new(),
|
||||
}
|
||||
}
|
||||
pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController {
|
||||
self.bin = bin.as_ref().to_string_lossy().into_owned();
|
||||
self
|
||||
}
|
||||
|
||||
async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) {
|
||||
match trigger {
|
||||
"stay" => {
|
||||
info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name);
|
||||
},
|
||||
"stop" => {
|
||||
if is_active(&self.name).await {
|
||||
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
|
||||
terminate_process(&self.name).await;
|
||||
self.state = ProcessState::Stopped;
|
||||
}
|
||||
},
|
||||
"hold" => {
|
||||
if !is_frozen(&self.name).await {
|
||||
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
|
||||
freeze_process(&self.name).await;
|
||||
self.state = ProcessState::Holding;
|
||||
}
|
||||
},
|
||||
"restart" => {
|
||||
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
|
||||
let _ = restart_process(&self.name, &self.bin).await;
|
||||
},
|
||||
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
|
||||
}
|
||||
tokio::time::sleep(Duration::from_micros(100)).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProcessUnit for ProcessesController {
|
||||
async fn process(&mut self) {
|
||||
if self.negative_events.len() == 0 {
|
||||
match self.state {
|
||||
ProcessState::Holding => {
|
||||
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
|
||||
if let Err(er) = unfreeze_process(&self.name).await {
|
||||
error!("Cannot unfreeze process {} : {}", self.name, er);
|
||||
} else {
|
||||
self.state = ProcessState::Pending;
|
||||
}
|
||||
},
|
||||
ProcessState::Stopped => {
|
||||
info!("No negative dependecies events on {} process. Starting ...", self.name);
|
||||
if let Err(er) = start_process(&self.name, &self.bin).await {
|
||||
error!("Cannot start process {} : {}", self.name, er);
|
||||
} else {
|
||||
self.state = ProcessState::Pending;
|
||||
}
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
}
|
||||
while let Ok(event) = self.event_reader.try_recv() {
|
||||
match event {
|
||||
Events::Positive(target) => {
|
||||
if self.negative_events.contains(&target) {
|
||||
self.negative_events.remove(&target);
|
||||
}
|
||||
},
|
||||
Events::Negative(event) => {
|
||||
match event {
|
||||
NegativeOutcomes::FileWasChanged(target, dep_type, trigger) |
|
||||
NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) |
|
||||
NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
|
||||
if !self.negative_events.contains(&target) {
|
||||
self.negative_events.insert(target.clone());
|
||||
|
||||
self.trigger_on(
|
||||
&target,
|
||||
&trigger,
|
||||
dep_type
|
||||
).await;
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `get_pid`
|
||||
/// ## for initializing process of unstoppable grubbing metrics.
|
||||
|
|
@ -162,14 +286,11 @@ pub async fn freeze_process(name: &str) {
|
|||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
pub async fn unfreeze_process(name: &str) {
|
||||
pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> {
|
||||
let _ = Command::new("pkill")
|
||||
.args(["-CONT", name])
|
||||
.output()
|
||||
.unwrap_or_else(|_| {
|
||||
error!("Failed to unfreeze process");
|
||||
std::process::exit(101);
|
||||
});
|
||||
.output()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// # Fn `restart_process`
|
||||
|
|
@ -185,7 +306,7 @@ pub async fn unfreeze_process(name: &str) {
|
|||
///
|
||||
/// *depends on* : fn `start_process`, fn `terminate_process`
|
||||
///
|
||||
pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> {
|
||||
pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> {
|
||||
terminate_process(name).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
start_process(name, path).await
|
||||
|
|
@ -204,7 +325,7 @@ pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError>
|
|||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
|
||||
pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> {
|
||||
// let runsh = format!("{} {}", "exec", path);
|
||||
let mut command = Command::new(path);
|
||||
// command.arg(path);
|
||||
|
|
@ -215,8 +336,7 @@ pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
|
|||
Ok(())
|
||||
}
|
||||
Err(er) => {
|
||||
println!("{:?}", er);
|
||||
Err(CustomError::Fatal)
|
||||
Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,193 @@
|
|||
use crate::options::structs::{CustomError, Services};
|
||||
use super::prcs::{is_active, is_frozen};
|
||||
use crate::options::structs::CustomError;
|
||||
use log::{error, warn};
|
||||
use std::net::{TcpStream, ToSocketAddrs};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tokio::time::Duration;
|
||||
use tokio::sync::mpsc::Sender as Sender;
|
||||
use async_trait::async_trait;
|
||||
|
||||
pub mod v2 {
|
||||
use log::info;
|
||||
|
||||
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
|
||||
|
||||
use super::*;
|
||||
use std::collections::{HashMap, BTreeMap, VecDeque};
|
||||
|
||||
type MpscSender = Arc<Sender<Events>>;
|
||||
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
|
||||
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
|
||||
// type wrapper for service wait queue
|
||||
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ServicesController {
|
||||
// i.e. yandex.ru
|
||||
#[allow(unused)]
|
||||
name : String,
|
||||
// i.e. yandex.ru:443
|
||||
access_url : Arc<str>,
|
||||
// "OK" or "Unavailable"
|
||||
state: ServiceState,
|
||||
// btree map with key as max wait time and it's key to hashmap
|
||||
config: ConnectionQueue,
|
||||
// Map of processes with their (trigger and mpsc sender)
|
||||
event_registrator : EventHandlers,
|
||||
}
|
||||
|
||||
impl PartialEq for ServicesController {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.access_url == other.access_url
|
||||
}
|
||||
}
|
||||
|
||||
impl ServicesController {
|
||||
pub fn new() -> ServicesController {
|
||||
ServicesController {
|
||||
name : String::new(),
|
||||
access_url : Arc::from(String::new()),
|
||||
state : ServiceState::Unavailable,
|
||||
config: ConnectionQueue::new(),
|
||||
event_registrator : EventHandlers::new(),
|
||||
}
|
||||
}
|
||||
pub fn with_access_name(
|
||||
mut self,
|
||||
hostname: &str,
|
||||
access_url: &str,
|
||||
) -> ServicesController {
|
||||
self.name = hostname.to_string();
|
||||
self.access_url = Arc::from(access_url);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_params(
|
||||
mut self,
|
||||
conn_queue: ConnectionQueue,
|
||||
event_reg: EventHandlers,
|
||||
) -> ServicesController {
|
||||
self.config = conn_queue;
|
||||
self.event_registrator = event_reg;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
|
||||
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
|
||||
}
|
||||
pub fn add_process(
|
||||
&mut self,
|
||||
proc_name: &str,
|
||||
trigger: Triggers,
|
||||
sender: MpscSender,
|
||||
) {
|
||||
let proc_name: Arc<str> = Arc::from(proc_name);
|
||||
// queue add
|
||||
if let Triggers::Service { wait, .. } = trigger {
|
||||
self.config.entry(wait)
|
||||
.and_modify(|el| el.push_back(proc_name.clone()))
|
||||
.or_insert({
|
||||
let mut temp = VecDeque::new();
|
||||
temp.push_back(proc_name.clone());
|
||||
temp
|
||||
});
|
||||
}
|
||||
// event add
|
||||
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
|
||||
}
|
||||
async fn check_state(&self) -> anyhow::Result<()> {
|
||||
let mut addrs = self.access_url.to_socket_addrs()?;
|
||||
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
|
||||
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn trigger_on(&mut self) {
|
||||
match self.state {
|
||||
ServiceState::Ok => {
|
||||
let _ = self.event_registrator
|
||||
.iter()
|
||||
.map(|(_, (_, el))| async {
|
||||
let _ = el.send(Events::Positive(self.access_url.clone())).await;
|
||||
});
|
||||
},
|
||||
ServiceState::Unavailable => {
|
||||
// looped check and notifying
|
||||
self.looped_check().await;
|
||||
},
|
||||
}
|
||||
}
|
||||
async fn looped_check(self: &mut Self) {
|
||||
let longest = self.config.last_entry().unwrap();
|
||||
let longest = longest.key();
|
||||
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
|
||||
let timer = tokio::time::Instant::now();
|
||||
let mut attempt: u32 = 1;
|
||||
let access_url = Arc::new(self.access_url.clone());
|
||||
// let event_registrator = &mut self.event_registrator;
|
||||
|
||||
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
|
||||
// let access_url = access_url.clone();
|
||||
loop {
|
||||
interapter.tick().await;
|
||||
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt);
|
||||
attempt += 1;
|
||||
|
||||
let state_check_result = self.check_state().await;
|
||||
|
||||
if state_check_result.is_ok() {
|
||||
info!("Connection to {} is `OK` now", &access_url);
|
||||
self.state = ServiceState::Ok;
|
||||
break;
|
||||
} else {
|
||||
let now = timer.elapsed();
|
||||
let iterator = self.config.iter()
|
||||
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
|
||||
.flat_map(|(_, a)| a.iter().cloned())
|
||||
.collect::<VecDeque<Arc<str>>>();
|
||||
|
||||
for name in iterator {
|
||||
let proc_name = name.to_string();
|
||||
info!("Trying to notify process `{}` ...", &proc_name);
|
||||
let sender_opt = self.event_registrator.get(&name)
|
||||
.map(|(trigger, sender)|
|
||||
(trigger.to_service_negative_event(name.clone()), sender)
|
||||
);
|
||||
|
||||
if let Some((tr, tx)) = sender_opt {
|
||||
let _ = tx.send(tr.unwrap()).await;
|
||||
} else {
|
||||
error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}).await {
|
||||
info!("Timeout of establishing connection to {}. ", &access_url);
|
||||
}
|
||||
}
|
||||
}
|
||||
#[async_trait]
|
||||
impl ProcessUnit for ServicesController {
|
||||
async fn process(&mut self) {
|
||||
// check_service(hostname, port)
|
||||
let current_state = self.check_state().await;
|
||||
match (&self.state, current_state) {
|
||||
(ServiceState::Unavailable, Ok(_)) => {
|
||||
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
||||
self.state = ServiceState::Ok;
|
||||
self.trigger_on().await;
|
||||
},
|
||||
(ServiceState::Ok, Err(_)) => {
|
||||
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
||||
self.state = ServiceState::Unavailable;
|
||||
self.trigger_on().await;
|
||||
},
|
||||
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url),
|
||||
_ => { /* DEAD END WITH NO INTEREST */ },
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `service_handler`
|
||||
/// ## function to realize mechanism of current process' dep services monitoring
|
||||
|
|
@ -19,53 +202,53 @@ use tokio::time::{Duration, Instant};
|
|||
///
|
||||
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
|
||||
///
|
||||
pub async fn service_handler(
|
||||
name: &str,
|
||||
services: &Vec<Services>,
|
||||
tx: Arc<mpsc::Sender<u8>>,
|
||||
) -> Result<(), CustomError> {
|
||||
// println!("service daemon on {}", name);
|
||||
for serv in services {
|
||||
if check_service(&serv.hostname, &serv.port).await.is_err() {
|
||||
if !is_active(name).await || is_frozen(name).await {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
error!(
|
||||
"Service {}:{} is unreachable for process {}",
|
||||
&serv.hostname, &serv.port, &name
|
||||
);
|
||||
match serv.triggers.on_lost.as_str() {
|
||||
"stay" => {
|
||||
tx.send(4).await.unwrap();
|
||||
continue;
|
||||
}
|
||||
"stop" => {
|
||||
if looped_service_connecting(name, serv).await.is_err() {
|
||||
tx.send(5).await.unwrap();
|
||||
tokio::task::yield_now().await;
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
}
|
||||
"hold" => {
|
||||
// if is_frozen(name).await {
|
||||
// return Err(CustomError::Fatal);
|
||||
// }
|
||||
if looped_service_connecting(name, serv).await.is_err() {
|
||||
tx.send(6).await.unwrap();
|
||||
tokio::task::yield_now().await;
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
tx.send(101).await.unwrap();
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
Ok(())
|
||||
}
|
||||
// pub async fn service_handler(
|
||||
// name: &str,
|
||||
// services: &Vec<Services>,
|
||||
// tx: Arc<mpsc::Sender<u8>>,
|
||||
// ) -> Result<(), CustomError> {
|
||||
// // println!("service daemon on {}", name);
|
||||
// for serv in services {
|
||||
// if check_service(&serv.hostname, &serv.port).await.is_err() {
|
||||
// if !is_active(name).await || is_frozen(name).await {
|
||||
// return Err(CustomError::Fatal);
|
||||
// }
|
||||
// error!(
|
||||
// "Service {}:{} is unreachable for process {}",
|
||||
// &serv.hostname, &serv.port, &name
|
||||
// );
|
||||
// match serv.triggers.on_lost.as_str() {
|
||||
// "stay" => {
|
||||
// tx.send(4).await.unwrap();
|
||||
// continue;
|
||||
// }
|
||||
// "stop" => {
|
||||
// if looped_service_connecting(name, serv).await.is_err() {
|
||||
// tx.send(5).await.unwrap();
|
||||
// tokio::task::yield_now().await;
|
||||
// return Err(CustomError::Fatal);
|
||||
// }
|
||||
// }
|
||||
// "hold" => {
|
||||
// // if is_frozen(name).await {
|
||||
// // return Err(CustomError::Fatal);
|
||||
// // }
|
||||
// if looped_service_connecting(name, serv).await.is_err() {
|
||||
// tx.send(6).await.unwrap();
|
||||
// tokio::task::yield_now().await;
|
||||
// return Err(CustomError::Fatal);
|
||||
// }
|
||||
// }
|
||||
// _ => {
|
||||
// tx.send(101).await.unwrap();
|
||||
// return Err(CustomError::Fatal);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
/// # Fn `looped_service_connecting`
|
||||
/// ## for service's state check in loop (with delay and restriction of attempts)
|
||||
|
|
@ -80,54 +263,54 @@ pub async fn service_handler(
|
|||
///
|
||||
/// *depends on* : fn `check_service`
|
||||
///
|
||||
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
|
||||
if serv.triggers.wait == 0 {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
warn!(
|
||||
"Attempting to connect from {} process to {}:{}",
|
||||
&name, &serv.hostname, &serv.port
|
||||
);
|
||||
match check_service(&serv.hostname, &serv.port).await {
|
||||
Ok(_) => {
|
||||
log::info!(
|
||||
"Successfully connected to {} from {} process!",
|
||||
&serv.hostname,
|
||||
&name
|
||||
);
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
let start = Instant::now();
|
||||
while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
warn!(
|
||||
"Attempting to connect from {} process to {}:{}",
|
||||
&name, &serv.hostname, &serv.port
|
||||
);
|
||||
match check_service(&serv.hostname, &serv.port).await {
|
||||
Ok(_) => {
|
||||
log::info!(
|
||||
"Successfully connected to {} from {} process!",
|
||||
&serv.hostname,
|
||||
&name
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Err(_) => {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(CustomError::Fatal)
|
||||
}
|
||||
}
|
||||
// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
|
||||
// if serv.triggers.wait == 0 {
|
||||
// loop {
|
||||
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
// warn!(
|
||||
// "Attempting to connect from {} process to {}:{}",
|
||||
// &name, &serv.hostname, &serv.port
|
||||
// );
|
||||
// match check_service(&serv.hostname, &serv.port).await {
|
||||
// Ok(_) => {
|
||||
// log::info!(
|
||||
// "Successfully connected to {} from {} process!",
|
||||
// &serv.hostname,
|
||||
// &name
|
||||
// );
|
||||
// break;
|
||||
// }
|
||||
// Err(_) => {
|
||||
// tokio::task::yield_now().await;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Ok(())
|
||||
// } else {
|
||||
// let start = Instant::now();
|
||||
// while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
||||
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
// warn!(
|
||||
// "Attempting to connect from {} process to {}:{}",
|
||||
// &name, &serv.hostname, &serv.port
|
||||
// );
|
||||
// match check_service(&serv.hostname, &serv.port).await {
|
||||
// Ok(_) => {
|
||||
// log::info!(
|
||||
// "Successfully connected to {} from {} process!",
|
||||
// &serv.hostname,
|
||||
// &name
|
||||
// );
|
||||
// return Ok(());
|
||||
// }
|
||||
// Err(_) => {
|
||||
// tokio::task::yield_now().await;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Err(CustomError::Fatal)
|
||||
// }
|
||||
// }
|
||||
|
||||
/// # Fn `check_service`
|
||||
/// ## for check current service's availiability
|
||||
|
|
|
|||
Loading…
Reference in New Issue