feature/configv2 #41
|
|
@ -4,3 +4,4 @@
|
||||||
Cargo.lock
|
Cargo.lock
|
||||||
hagent_test.sock
|
hagent_test.sock
|
||||||
release
|
release
|
||||||
|
*.sock
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "noxis-cli"
|
name = "noxis-cli"
|
||||||
version = "0.2.4"
|
version = "0.2.7"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
|
||||||
|
|
@ -2,11 +2,17 @@ use clap::{Parser, Subcommand};
|
||||||
|
|
||||||
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
||||||
pub struct Cli {
|
pub struct Cli {
|
||||||
|
#[arg(
|
||||||
|
short,
|
||||||
|
default_value="noxis-rs.sock",
|
||||||
|
help="explicit specify of NOXIS Socket file"
|
||||||
|
)]
|
||||||
|
pub socket : String,
|
||||||
#[command(
|
#[command(
|
||||||
subcommand,
|
subcommand,
|
||||||
help = "to manage Noxis work",
|
help = "to manage Noxis work",
|
||||||
)]
|
)]
|
||||||
command : Commands,
|
pub command : Commands,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
|
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
|
||||||
|
|
@ -50,13 +56,13 @@ pub struct StartAction {
|
||||||
num_args = 1..,
|
num_args = 1..,
|
||||||
value_delimiter = ' '
|
value_delimiter = ' '
|
||||||
)]
|
)]
|
||||||
flags : Vec<String>,
|
pub flags : Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
||||||
pub struct ConfigCommand {
|
pub struct ConfigCommand {
|
||||||
#[command(subcommand)]
|
#[command(subcommand)]
|
||||||
action : ConfigAction,
|
pub action : ConfigAction,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
|
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
|
||||||
|
|
@ -83,12 +89,12 @@ pub struct LocalConfig {
|
||||||
action,
|
action,
|
||||||
help = "to read following input as JSON",
|
help = "to read following input as JSON",
|
||||||
)]
|
)]
|
||||||
is_json : bool,
|
pub is_json : bool,
|
||||||
// value
|
// value
|
||||||
#[arg(
|
#[arg(
|
||||||
help = "path to config file or config String (with --json flag)",
|
help = "path to config file or config String (with --json flag)",
|
||||||
)]
|
)]
|
||||||
config : String,
|
pub config : String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
|
||||||
|
|
@ -96,16 +102,16 @@ pub struct ProcessCommand {
|
||||||
#[arg(
|
#[arg(
|
||||||
help = "name of needed process",
|
help = "name of needed process",
|
||||||
)]
|
)]
|
||||||
process : String,
|
pub process : String,
|
||||||
#[command(
|
#[command(
|
||||||
subcommand,
|
subcommand,
|
||||||
help = "To get current process's status",
|
help = "To get current process's status",
|
||||||
)]
|
)]
|
||||||
action : ProcessAction,
|
pub action : ProcessAction,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
|
#[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)]
|
||||||
enum ProcessAction {
|
pub enum ProcessAction {
|
||||||
#[command(
|
#[command(
|
||||||
about = "To get info about current process status",
|
about = "To get info about current process status",
|
||||||
)]
|
)]
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,15 @@
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use super::cli_net::NOXIS_RS_CREDS;
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum NoxisCliError {
|
pub enum NoxisCliError {
|
||||||
#[error("Can't send any data to {:?}. Noxis-rs daemon is disabled or can't be accessed", NOXIS_RS_CREDS)]
|
#[error("Can't find socket `{0}`. Error : {1}")]
|
||||||
NoxisDaemonMissing,
|
NoxisDaemonMissing(String, String),
|
||||||
#[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's web-functionality")]
|
#[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")]
|
||||||
PortIsNotWritable,
|
PortIsNotWritable,
|
||||||
#[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")]
|
#[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")]
|
||||||
CliPromptCanNotBeSent,
|
CliPromptCanNotBeSent,
|
||||||
#[error("Can't parse CLI struct and send as byte stream")]
|
#[error("Can't parse CLI struct and send as byte stream")]
|
||||||
ToStringCliParsingParsing,
|
ToStringCliParsingParsing,
|
||||||
|
#[error("Can't read Noxis response due to {0}")]
|
||||||
|
CliResponseReadError(String)
|
||||||
}
|
}
|
||||||
|
|
@ -1,32 +1,30 @@
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::UnixStream;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::{AsyncWriteExt, AsyncReadExt};
|
||||||
use tokio::time::{Duration, sleep};
|
use tokio::time::{Duration, sleep};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use super::Cli;
|
use super::Cli;
|
||||||
use super::cli_error::NoxisCliError;
|
use super::cli_error::NoxisCliError;
|
||||||
|
|
||||||
pub const NOXIS_RS_CREDS: &str = "127.0.0.1:7753";
|
async fn create_us_stream(cli: &Cli) -> Result<UnixStream> {
|
||||||
|
Ok(UnixStream::connect(&cli.socket).await.map_err(|er| NoxisCliError::NoxisDaemonMissing((&cli.socket).to_string(), er.to_string()))?)
|
||||||
|
|
||||||
pub async fn create_tcp_stream() -> Result<TcpStream> {
|
|
||||||
Ok(TcpStream::connect(NOXIS_RS_CREDS).await.map_err(|_| NoxisCliError::NoxisDaemonMissing)?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn try_send(stream: Result<TcpStream>, params: Cli) -> Result<()> {
|
pub async fn try_send(cli: Cli) -> Result<()> {
|
||||||
use serde_json::to_string;
|
// let stream = create_us_stream(&cli).await;
|
||||||
let mut stream = stream.map_err(|_| NoxisCliError::NoxisDaemonMissing)?;
|
let mut stream = create_us_stream(&cli).await?;
|
||||||
loop {
|
|
||||||
if stream.writable().await.is_err() {
|
|
||||||
sleep(Duration::from_millis(100)).await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// let msg: Cli = from_str(&format!("{:?}", params))?;
|
|
||||||
let msg= to_string(¶ms).map_err(|_| NoxisCliError::ToStringCliParsingParsing)?;
|
|
||||||
// let msg = r"HTTP/1.1 POST\r\nContent-Length: 14\r\nContent-Type: text/plain\r\n\r\nHello, World!@";
|
|
||||||
|
|
||||||
stream.write_all(msg.as_bytes()).await.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
|
let msg = serde_json::to_vec(&cli)
|
||||||
// ...
|
.map_err(|_| NoxisCliError::ToStringCliParsingParsing)?;
|
||||||
break;
|
|
||||||
}
|
stream.write_all(&msg)
|
||||||
|
.await
|
||||||
|
.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
|
||||||
|
|
||||||
|
let mut response = [0; 1024];
|
||||||
|
stream.read(&mut response)
|
||||||
|
.await
|
||||||
|
.map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?;
|
||||||
|
|
||||||
|
println!("Received response: {}", String::from_utf8_lossy(&response));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -4,12 +4,12 @@ mod cli_error;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use cli::Cli;
|
use cli::Cli;
|
||||||
use cli_net::{create_tcp_stream, try_send};
|
use cli_net::try_send;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()>{
|
async fn main() -> Result<()>{
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
try_send(create_tcp_stream().await, cli).await?;
|
try_send(cli).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "noxis-rs"
|
name = "noxis-rs"
|
||||||
version = "0.11.10"
|
version = "0.11.26"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
@ -8,13 +8,15 @@ anyhow = "1.0.93"
|
||||||
chrono = "0.4.38"
|
chrono = "0.4.38"
|
||||||
clap = { version = "4.5.21", features = ["derive"] }
|
clap = { version = "4.5.21", features = ["derive"] }
|
||||||
env_logger = "0.11.3"
|
env_logger = "0.11.3"
|
||||||
inotify = "0.10.2"
|
inotify = "0.11.0"
|
||||||
log = "0.4.22"
|
log = "0.4.22"
|
||||||
pcap = "2.2.0"
|
pcap = "2.2.0"
|
||||||
redis = "0.25.4"
|
redis = "0.29.2"
|
||||||
serde = { version = "1.0.203", features = ["derive"] }
|
serde = { version = "1.0.203", features = ["derive"] }
|
||||||
serde_json = "1.0.118"
|
serde_json = "1.0.118"
|
||||||
sysinfo = "0.32.0"
|
sysinfo = "0.32.0"
|
||||||
tokio = { version = "1.38.0", features = ["full", "time"] }
|
tokio = { version = "1.38.0", features = ["full", "time"] }
|
||||||
noxis-cli = { path = "../noxis-cli" }
|
noxis-cli = { path = "../noxis-cli" }
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
|
futures = "0.3.31"
|
||||||
|
async-trait = "0.1.88"
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"dateOfCreation": "1721381809104",
|
"dateOfCreation": "1721381809112",
|
||||||
"configServer": "localhost",
|
"configServer": "192.168.2.37",
|
||||||
"processes": [
|
"processes": [
|
||||||
{
|
{
|
||||||
"name": "temp-process",
|
"name": "temp-process",
|
||||||
|
|
@ -12,7 +12,7 @@
|
||||||
"src": "./tests/examples/",
|
"src": "./tests/examples/",
|
||||||
"triggers": {
|
"triggers": {
|
||||||
"onDelete": "stop",
|
"onDelete": "stop",
|
||||||
"onChange": "stay"
|
"onChange": "restart"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
|
|
@ -22,8 +22,7 @@
|
||||||
"port": 443,
|
"port": 443,
|
||||||
"triggers": {
|
"triggers": {
|
||||||
"wait": 10,
|
"wait": 10,
|
||||||
"delay": 2,
|
"onLost": "restart"
|
||||||
"onLost": "hold"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
mod options;
|
mod options;
|
||||||
mod utils;
|
mod utils;
|
||||||
|
|
||||||
use anyhow::Error;
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use options::config::*;
|
use options::config::*;
|
||||||
|
|
@ -14,84 +13,140 @@ use std::time::Duration;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use utils::*;
|
use utils::*;
|
||||||
use options::preboot::PrebootParams;
|
use options::preboot::PrebootParams;
|
||||||
|
use tokio::sync::{broadcast, oneshot};
|
||||||
|
use options::config::v2::init_config_mechanism;
|
||||||
|
use utils::v2::init_monitoring;
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
|
||||||
async fn main() -> anyhow::Result<()>{
|
async fn main() -> anyhow::Result<()>{
|
||||||
let preboot = Arc::new(PrebootParams::parse().validate()?);
|
let preboot = Arc::new(PrebootParams::parse().validate()?);
|
||||||
|
|
||||||
let _ = setup_logger();
|
let _ = setup_logger();
|
||||||
|
|
||||||
info!("Runner is configurating...");
|
info!("Noxis is configurating...");
|
||||||
|
//
|
||||||
// setting up redis connection \
|
let (tx_brd, mut rx_brd) = broadcast::channel::<Processes>(1);
|
||||||
// then conf checks to choose the most actual \
|
// cli <-> config
|
||||||
let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
|
let (tx_oneshot, rx_oneshot) = oneshot::channel::<Processes>();
|
||||||
error!("No actual configuration for runner. Stopping...");
|
|
||||||
std::process::exit(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"Current runner configuration: {}",
|
|
||||||
&processes.date_of_creation
|
|
||||||
);
|
|
||||||
info!("Runner is ready. Initializing...");
|
|
||||||
|
|
||||||
if processes.processes.is_empty() {
|
|
||||||
error!("Processes list is null, runner-rs initialization is stopped");
|
|
||||||
return Err(Error::msg("Empty processes segment in config"));
|
|
||||||
}
|
|
||||||
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
||||||
// is in need to send to the signals handler thread
|
|
||||||
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
|
|
||||||
|
|
||||||
for proc in processes.processes.iter() {
|
// initilaizing task for config manipulations
|
||||||
info!(
|
let config_module = tokio::spawn(async move {
|
||||||
"Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
|
let _ = init_config_mechanism(
|
||||||
proc.name,
|
rx_oneshot,
|
||||||
proc.path,
|
tx_brd,
|
||||||
proc.dependencies.files.len(),
|
preboot.clone()
|
||||||
proc.dependencies.services.len()
|
).await;
|
||||||
);
|
});
|
||||||
|
handler.push(config_module);
|
||||||
|
|
||||||
// creating msg channel
|
// initilaizing task for cli manipulation
|
||||||
// can or should be executed in new thread
|
let cli_module = tokio::spawn(async move {
|
||||||
let (tx, mut rx) = mpsc::channel::<u8>(1);
|
if let Err(er) = init_cli_pipeline().await {
|
||||||
let proc = Arc::new(proc.clone());
|
error!("CLI pipeline failed due to {}", er)
|
||||||
let tx = Arc::new(tx.clone());
|
|
||||||
|
|
||||||
senders.push(Arc::clone(&tx.clone()));
|
|
||||||
|
|
||||||
let event = tokio::spawn(async move {
|
|
||||||
run_daemons(proc.clone(), tx.clone(), &mut rx).await;
|
|
||||||
});
|
|
||||||
handler.push(event);
|
|
||||||
}
|
|
||||||
|
|
||||||
// destructor addition
|
|
||||||
handler.push(tokio::spawn(async move {
|
|
||||||
if set_valid_destructor(Arc::new(senders)).await.is_err() {
|
|
||||||
error!("Linux signals handler creation failed. Terminating main thread...");
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
handler.push(cli_module);
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
// initilaizing task for deinitializing `Noxis`
|
||||||
info!("End of job. Terminating main thread...");
|
let ctrlc = tokio::spawn(async move {
|
||||||
|
if let Err(er) = set_valid_destructor(vec![].into()).await {
|
||||||
|
error!("Destructor mod failed due to {}", er);
|
||||||
|
}
|
||||||
std::process::exit(0);
|
std::process::exit(0);
|
||||||
}));
|
});
|
||||||
|
handler.push(ctrlc);
|
||||||
|
|
||||||
// remote config update subscription
|
let monitoring = tokio::spawn(async move {
|
||||||
handler.push(tokio::spawn(async move {
|
let config = {
|
||||||
let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
|
let mut tick = tokio::time::interval(Duration::from_millis(500));
|
||||||
}));
|
loop {
|
||||||
|
tick.tick().await;
|
||||||
// cli pipeline
|
break match rx_brd.try_recv() {
|
||||||
handler.push(tokio::spawn(async move {
|
Ok(conf) => conf,
|
||||||
let _ = init_cli_pipeline().await;
|
Err(_) => continue,
|
||||||
}));
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(er) = init_monitoring(config).await {
|
||||||
|
error!("Monitoring mod failed due to {}", er);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
handler.push(monitoring);
|
||||||
|
|
||||||
for i in handler {
|
for i in handler {
|
||||||
let _ = i.await;
|
let _ = i.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setting up redis connection \
|
||||||
|
// then conf checks to choose the most actual \
|
||||||
|
// let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
|
||||||
|
// error!("No actual configuration for runner. Stopping...");
|
||||||
|
// std::process::exit(1);
|
||||||
|
// });
|
||||||
|
//
|
||||||
|
// info!(
|
||||||
|
// "Current runner configuration: {}",
|
||||||
|
// &processes.date_of_creation
|
||||||
|
// );
|
||||||
|
// info!("Runner is ready. Initializing...");
|
||||||
|
//
|
||||||
|
// if processes.processes.is_empty() {
|
||||||
|
// error!("Processes list is null, runner-rs initialization is stopped");
|
||||||
|
// return Err(Error::msg("Empty processes segment in config"));
|
||||||
|
// }
|
||||||
|
// let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
||||||
|
// // is in need to send to the signals handler thread
|
||||||
|
// let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
|
||||||
|
//
|
||||||
|
// for proc in processes.processes.iter() {
|
||||||
|
// info!(
|
||||||
|
// "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
|
||||||
|
// proc.name,
|
||||||
|
// proc.path,
|
||||||
|
// proc.dependencies.files.len(),
|
||||||
|
// proc.dependencies.services.len()
|
||||||
|
// );
|
||||||
|
//
|
||||||
|
// // creating msg channel
|
||||||
|
// // can or should be executed in new thread
|
||||||
|
// let (tx, mut rx) = mpsc::channel::<u8>(1);
|
||||||
|
// let proc = Arc::new(proc.clone());
|
||||||
|
// let tx = Arc::new(tx.clone());
|
||||||
|
//
|
||||||
|
// senders.push(Arc::clone(&tx.clone()));
|
||||||
|
//
|
||||||
|
// let event = tokio::spawn(async move {
|
||||||
|
// run_daemons(proc.clone(), tx.clone(), &mut rx).await;
|
||||||
|
// });
|
||||||
|
// handler.push(event);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // destructor addition
|
||||||
|
// handler.push(tokio::spawn(async move {
|
||||||
|
// if set_valid_destructor(Arc::new(senders)).await.is_err() {
|
||||||
|
// error!("Linux signals handler creation failed. Terminating main thread...");
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// tokio::time::sleep(Duration::from_millis(200)).await;
|
||||||
|
// info!("End of job. Terminating main thread...");
|
||||||
|
// std::process::exit(0);
|
||||||
|
// }));
|
||||||
|
//
|
||||||
|
// // remote config update subscription
|
||||||
|
// handler.push(tokio::spawn(async move {
|
||||||
|
// let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
|
||||||
|
// }));
|
||||||
|
//
|
||||||
|
// // cli pipeline
|
||||||
|
// handler.push(tokio::spawn(async move {
|
||||||
|
// let _ = init_cli_pipeline().await;
|
||||||
|
// }));
|
||||||
|
//
|
||||||
|
// for i in handler {
|
||||||
|
// let _ = i.await;
|
||||||
|
// }
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,9 @@
|
||||||
use log::{error, info, warn};
|
use log::{error, info};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{ UnixStream, UnixListener };
|
||||||
use anyhow::{Result as DynResult, Error};
|
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}};
|
use std::fs;
|
||||||
// use std::io::BufReader;
|
use tokio::io::{ AsyncWriteExt, AsyncReadExt};
|
||||||
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
|
|
||||||
use noxis_cli::Cli;
|
use noxis_cli::Cli;
|
||||||
use serde_json::from_str;
|
|
||||||
|
|
||||||
/// # Fn `init_cli_pipeline`
|
/// # Fn `init_cli_pipeline`
|
||||||
/// ## for catching all input requests from CLI
|
/// ## for catching all input requests from CLI
|
||||||
|
|
@ -21,49 +18,32 @@ use serde_json::from_str;
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
pub async fn init_cli_pipeline() -> DynResult<()> {
|
pub async fn init_cli_pipeline() -> anyhow::Result<()> {
|
||||||
match init_listener().await {
|
let socket_path = "noxis.sock";
|
||||||
Some(list) => {
|
let _ = fs::remove_file(socket_path);
|
||||||
|
|
||||||
|
match UnixListener::bind(socket_path) {
|
||||||
|
Ok(list) => {
|
||||||
|
// TODO: remove `unwrap`s
|
||||||
|
info!("Listening on {}", socket_path);
|
||||||
loop {
|
loop {
|
||||||
if let Ok((socket, addr)) = list.accept().await {
|
match list.accept().await {
|
||||||
// isolation
|
Ok((socket, _)) => {
|
||||||
if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() {
|
// tokio::spawn();
|
||||||
warn!("Declined attempt to connect TCP-socket from {}", addr);
|
process_connection(socket).await;
|
||||||
continue;
|
},
|
||||||
}
|
Err(er) => {
|
||||||
process_connection(socket).await;
|
error!("Cannot poll connection to CLI due to {}", er);
|
||||||
|
sleep(Duration::from_millis(300)).await;
|
||||||
|
},
|
||||||
}
|
}
|
||||||
sleep(Duration::from_millis(500)).await;
|
|
||||||
}
|
}
|
||||||
// Ok(())
|
// Ok(())
|
||||||
},
|
},
|
||||||
None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use"))
|
Err(er) => {
|
||||||
}
|
error!("Failed to open UnixListener for CLI");
|
||||||
}
|
Err(er.into())
|
||||||
|
|
||||||
/// # Fn `init_listener`
|
|
||||||
/// ## for creating TCP-listener for communicating with CLI
|
|
||||||
///
|
|
||||||
/// *input* : -
|
|
||||||
///
|
|
||||||
/// *output* : `Some<TcpListener>` if port 7753 was opened | None if not
|
|
||||||
///
|
|
||||||
/// *initiator* : fn `init_cli_pipeline`
|
|
||||||
///
|
|
||||||
/// *managing* : `TcpListener` object to handle requests
|
|
||||||
///
|
|
||||||
/// *depends on* : `tokio::net::TcpListener`
|
|
||||||
///
|
|
||||||
async fn init_listener() -> Option<TcpListener> {
|
|
||||||
match TcpListener::bind("127.0.0.1:7753").await {
|
|
||||||
Ok(listener) => {
|
|
||||||
info!("Runner is listening localhost:7753");
|
|
||||||
Some(listener)
|
|
||||||
},
|
},
|
||||||
Err(_) => {
|
|
||||||
error!("Cannot create TCP listener for CLI");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -80,27 +60,29 @@ async fn init_listener() -> Option<TcpListener> {
|
||||||
///
|
///
|
||||||
/// *depends on* : `tokio::net::TcpStream`
|
/// *depends on* : `tokio::net::TcpStream`
|
||||||
///
|
///
|
||||||
async fn process_connection(mut stream: TcpStream) {
|
async fn process_connection(mut stream: UnixStream) {
|
||||||
let buf_reader = BufReader::new(stream.borrow_mut());
|
let mut buf = vec![0; 1024];
|
||||||
let mut rqst = buf_reader.lines();
|
match stream.read(&mut buf).await {
|
||||||
|
Ok(0) => {
|
||||||
|
info!("Client disconnected ");
|
||||||
while let Ok(Some(line)) = rqst.next_line().await {
|
},
|
||||||
if line.is_empty() {
|
Ok(n) => {
|
||||||
break
|
buf.truncate(n);
|
||||||
}
|
info!("CLI have sent {} bytes", n);
|
||||||
match from_str::<Cli>(&line) {
|
match serde_json::from_slice::<Cli>(&buf) {
|
||||||
Ok(req) => {
|
Ok(cli) => {
|
||||||
// TODO: func wrapper
|
info!("Received CLI request: {:?}", cli);
|
||||||
dbg!(req);
|
let response = "OK";
|
||||||
},
|
if let Err(e) = stream.write_all(response.as_bytes()).await {
|
||||||
Err(_) => {
|
error!("Failed to send response: {}", e);
|
||||||
break
|
}
|
||||||
},
|
}
|
||||||
}
|
Err(e) => {
|
||||||
println!("{}", line);
|
error!("Failed to parse CLI request: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => error!("Failed to read from socket: {}", e),
|
||||||
}
|
}
|
||||||
|
let _ = stream.shutdown().await;
|
||||||
let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!";
|
|
||||||
stream.write_all(response.as_bytes()).await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,9 +9,393 @@ use std::sync::Arc;
|
||||||
use std::{env, fs};
|
use std::{env, fs};
|
||||||
use super::preboot::PrebootParams;
|
use super::preboot::PrebootParams;
|
||||||
use tokio::time::{Duration, sleep};
|
use tokio::time::{Duration, sleep};
|
||||||
|
// use redis::PubSub;
|
||||||
|
use tokio::sync::{
|
||||||
|
oneshot,
|
||||||
|
oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender },
|
||||||
|
broadcast::Sender as BroadcastSender, broadcast::Receiver as BroadcastReceiver };
|
||||||
|
use crate::utils::files::create_watcher;
|
||||||
|
use std::fs::File;
|
||||||
|
use inotify::EventMask;
|
||||||
|
|
||||||
// const CONFIG_PATH: &str = "settings.json";
|
// const CONFIG_PATH: &str = "settings.json";
|
||||||
|
|
||||||
|
pub mod v2 {
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use crate::utils::get_container_id;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
pub async fn init_config_mechanism(
|
||||||
|
// to handle cli config changes
|
||||||
|
cli_oneshot: OneShotReciever<Processes>,
|
||||||
|
// to share local config with PRCS, CLI_PIPELINE and CONFIG modules
|
||||||
|
brd_tx : BroadcastSender<Processes>,
|
||||||
|
// preboot params (args)
|
||||||
|
params : Arc<PrebootParams>
|
||||||
|
/*...*/
|
||||||
|
) {
|
||||||
|
// channel for pubsub to handle local config pulling
|
||||||
|
let local_config_brd_reciever = brd_tx.subscribe();
|
||||||
|
// channel between pub-sub mech and local config mech
|
||||||
|
let (tx_pb_lc, rx_pb_lc) = oneshot::channel::<bool>();
|
||||||
|
// channel between cli mech and local config mech
|
||||||
|
let (tx_cli_lc, rx_cli_lc) = oneshot::channel::<bool>();
|
||||||
|
|
||||||
|
// dbg!("before lc");
|
||||||
|
let params_clone = params.clone();
|
||||||
|
let for_lc_path = params.clone();
|
||||||
|
let lc_path = for_lc_path
|
||||||
|
.config
|
||||||
|
.to_str()
|
||||||
|
.unwrap_or("settings.json");
|
||||||
|
|
||||||
|
// future to init work with local config
|
||||||
|
let lc_future = tokio::spawn(
|
||||||
|
// let params = params.clone();
|
||||||
|
local_config_reciever(
|
||||||
|
params_clone,
|
||||||
|
rx_pb_lc,
|
||||||
|
rx_cli_lc,
|
||||||
|
Arc::new(brd_tx)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
// dbg!("before pb");
|
||||||
|
// future to init work with pub sub mechanism
|
||||||
|
let pubsub_future = tokio::spawn(
|
||||||
|
pubsub_config_reciever(
|
||||||
|
tx_pb_lc,
|
||||||
|
params.clone(),
|
||||||
|
local_config_brd_reciever
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
// dbg!("before cli");
|
||||||
|
// future to catch new configs from cli pipeline
|
||||||
|
let cli_future = tokio::spawn(
|
||||||
|
from_cli_config_reciever(
|
||||||
|
cli_oneshot,
|
||||||
|
tx_cli_lc
|
||||||
|
)
|
||||||
|
|
||||||
|
);
|
||||||
|
// let _ = lc_future.await;
|
||||||
|
// dbg!("before select");
|
||||||
|
tokio::select! {
|
||||||
|
lc_result = lc_future => {
|
||||||
|
// dbg!("end of lc");
|
||||||
|
match lc_result {
|
||||||
|
Ok(res) => {
|
||||||
|
if res.is_ok() {
|
||||||
|
info!("Local config warding mechanism stopped, waiting for others ...");
|
||||||
|
sleep(Duration::from_millis(500)).await;
|
||||||
|
let _ = restart_main_thread();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
error!("Local config warding mechanism crushed, restarting ...");
|
||||||
|
let _ = restart_main_thread();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Local config warding mechanism crushed, restarting ...");
|
||||||
|
let _ = restart_main_thread();
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
pb_result = pubsub_future => {
|
||||||
|
match pb_result {
|
||||||
|
Ok(res) => {
|
||||||
|
if res.is_ok() {
|
||||||
|
info!("New config was saved locally, restarting ...");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
error!("Pubsub mechanism crushed, restarting ...");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Pubsub mechanism crushed, restarting ...");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
let _ = restart_main_thread();
|
||||||
|
},
|
||||||
|
cli_config_option = cli_future => {
|
||||||
|
match cli_config_option {
|
||||||
|
Err(_) => error!("CLI pulling new config mechanism crushed, restarting ..."),
|
||||||
|
Ok(option_config) => {
|
||||||
|
match option_config {
|
||||||
|
None => error!("CLI pulling new config mechanism crushed, restarting ..."),
|
||||||
|
Some(config) => {
|
||||||
|
info!("New config was pulled from CLI, saving and restarting ...");
|
||||||
|
let _ = save_new_config(&config, lc_path);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
let _ = restart_main_thread();
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// dbg!("after select");
|
||||||
|
// TODO! futures + select! [OK]
|
||||||
|
// TODO! tests config
|
||||||
|
}
|
||||||
|
pub async fn get_redis_connection(params: &str) -> Option<Connection> {
|
||||||
|
for i in 1..=3 {
|
||||||
|
let redis_url = format!("redis://{}/", params);
|
||||||
|
info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i);
|
||||||
|
if let Ok(client) = Client::open(redis_url) {
|
||||||
|
if let Ok(conn) = client.get_connection() {
|
||||||
|
info!("Successfully opened Redis connection");
|
||||||
|
return Some(conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
error!("Error with subscribing Redis stream on update. Retrying in 5 secs...");
|
||||||
|
sleep(Duration::from_secs(5)).await;
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
// loop checking redis pubsub
|
||||||
|
async fn pubsub_config_reciever(
|
||||||
|
// to stop checking local config
|
||||||
|
local_conf_tx : OneShotSender<bool>,
|
||||||
|
params : Arc<PrebootParams>,
|
||||||
|
tx_brd_local : BroadcastReceiver<Processes>,
|
||||||
|
) -> anyhow::Result<()>{
|
||||||
|
/*...*/
|
||||||
|
// dbg!("start of pb");
|
||||||
|
let mut tx_brd_local = tx_brd_local;
|
||||||
|
let local_config = if !tx_brd_local.is_empty() {
|
||||||
|
tx_brd_local.recv().await?
|
||||||
|
} else {
|
||||||
|
// Processes::default()
|
||||||
|
let mut tick = tokio::time::interval(Duration::from_millis(500));
|
||||||
|
loop {
|
||||||
|
tick.tick().await;
|
||||||
|
break match tx_brd_local.recv().await {
|
||||||
|
Ok(conf) => conf,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
match get_redis_connection(&local_config.config_server).await {
|
||||||
|
Some(mut conn) => {
|
||||||
|
let mut pub_sub = conn.as_pubsub();
|
||||||
|
let channel_name = get_container_id().unwrap_or(String::from("default"));
|
||||||
|
let channel_name = channel_name.trim();
|
||||||
|
match pub_sub.subscribe(channel_name) {
|
||||||
|
Err(er) => {
|
||||||
|
error!("Cannot subscribe pubsub channel due to {}", &er);
|
||||||
|
return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
|
||||||
|
},
|
||||||
|
Ok(_) => {
|
||||||
|
info!("Successfully subscribed to {} pubsub channel", channel_name);
|
||||||
|
let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3)));
|
||||||
|
loop {
|
||||||
|
if let Ok(msg) = pub_sub.get_message() {
|
||||||
|
// dbg!("ok on get message");
|
||||||
|
let payload : Result<String, _> = msg.get_payload();
|
||||||
|
match payload {
|
||||||
|
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
|
||||||
|
Ok(payload) => {
|
||||||
|
if let Some(remote) = parse_extern_config(&payload) {
|
||||||
|
match config_comparing(&local_config, &remote) {
|
||||||
|
ConfigActuality::Local => {
|
||||||
|
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
|
||||||
|
},
|
||||||
|
ConfigActuality::Remote => {
|
||||||
|
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
|
||||||
|
// to stop watching local config file mechanism
|
||||||
|
let _ = local_conf_tx.send(true);
|
||||||
|
let config_path = params.config.to_str().unwrap_or("settings.json");
|
||||||
|
|
||||||
|
if save_new_config(&remote, &config_path).is_err() {
|
||||||
|
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
|
||||||
|
return Err(anyhow::Error::msg(
|
||||||
|
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
|
||||||
|
))
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
warn!("Invalid config was pulled from Redis channel")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// delay
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
sleep(Duration::from_secs(20)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
async fn local_config_reciever(
|
||||||
|
params : Arc<PrebootParams>,
|
||||||
|
pubsub_oneshot : OneShotReciever<bool>,
|
||||||
|
cli_oneshot : OneShotReciever<bool>,
|
||||||
|
brd_tx : Arc<BroadcastSender<Processes>>,
|
||||||
|
/*...*/
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
/*...*/
|
||||||
|
// shadowing as mut
|
||||||
|
let mut pubsub_oneshot = pubsub_oneshot;
|
||||||
|
let mut cli_oneshot = cli_oneshot;
|
||||||
|
// fill with default empty config, mut to change later
|
||||||
|
let mut _current_config = Processes::default();
|
||||||
|
// PathBuf to &str to work with local config path as slice
|
||||||
|
let local_config_path = params
|
||||||
|
.config
|
||||||
|
.to_str()
|
||||||
|
.unwrap_or("settings.json");
|
||||||
|
|
||||||
|
match load_processes(local_config_path) {
|
||||||
|
// if local exists
|
||||||
|
Some(conf) => {
|
||||||
|
info!("Local config `{}` was found.", &conf.date_of_creation);
|
||||||
|
_current_config = conf;
|
||||||
|
if let Err(er) = brd_tx.send(_current_config.clone()) {
|
||||||
|
error!("Cannot share local config with broadcast due to {}", er);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// if local is not exist
|
||||||
|
None => {
|
||||||
|
warn!("Local config wasn't found. Waiting for new ...");
|
||||||
|
return Err(anyhow::Error::msg("No local config"));
|
||||||
|
// ...
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 100% local exists here
|
||||||
|
// create watcher on local config file
|
||||||
|
match create_watcher("", local_config_path) {
|
||||||
|
Ok(mut watcher) => {
|
||||||
|
loop {
|
||||||
|
let mut need_to_export_config = false;
|
||||||
|
// let mut need_to_recreate_watcher = false;
|
||||||
|
// return situations here
|
||||||
|
// 1) oneshot signal
|
||||||
|
// 2) if config was deleted -> recreate and fill with current config that is held here
|
||||||
|
// 3) if config was changed -> fill with current config that is held here
|
||||||
|
|
||||||
|
// catching signal from pubsub
|
||||||
|
// it's because pubsub mech pulled new valid and actual config and now it's time to ...
|
||||||
|
// ... overwrite local config file and restart main thread
|
||||||
|
if let Ok(_) = pubsub_oneshot.try_recv() {
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// catching signal from cli
|
||||||
|
// it's because cli mech pulled new valid and actual config and now it's time to ...
|
||||||
|
// ... overwrite local config file and restart main thread (like in previous mechanism)
|
||||||
|
if let Ok(_) = cli_oneshot.try_recv() {
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ...
|
||||||
|
// ! ... FROM WATCHER"S BUFFER
|
||||||
|
|
||||||
|
// existing check
|
||||||
|
if !params.config.exists() {
|
||||||
|
warn!("Local config file was deleted or moved. Recreating new one with saved data ...");
|
||||||
|
need_to_export_config = true;
|
||||||
|
// need_to_recreate_watcher = true;
|
||||||
|
} else {
|
||||||
|
// changes check
|
||||||
|
let mut buffer = [0; 128];
|
||||||
|
let events = watcher.read_events(&mut buffer);
|
||||||
|
if events.is_ok() {
|
||||||
|
let events: Vec<EventMask> = events
|
||||||
|
.unwrap()
|
||||||
|
.map(|mask| mask.mask)
|
||||||
|
.filter(|mask| {
|
||||||
|
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
if !events.is_empty() {
|
||||||
|
warn!("Local config file was overwritten. Discarding changes ...");
|
||||||
|
need_to_export_config = true;
|
||||||
|
// events
|
||||||
|
// .iter()
|
||||||
|
// .any(|event| *event == EventMask::DELETE_SELF)
|
||||||
|
// .then(|| need_to_recreate_watcher = true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// exporting data
|
||||||
|
if need_to_export_config {
|
||||||
|
if let Err(er) = export_saved_config_data_locally(¶ms.config, &_current_config).await {
|
||||||
|
error!("Cannot save actual imported config due to {}", er);
|
||||||
|
} else {
|
||||||
|
// recreation watcher (draining activity buffer mechanism)
|
||||||
|
// if local config file was deleted and recreated
|
||||||
|
// if local config file was modified locally
|
||||||
|
match create_watcher("", local_config_path) {
|
||||||
|
Ok(new) => watcher = new,
|
||||||
|
Err(er) => error!("Cannot create new watcher due to {}", er),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sleep(Duration::from_millis(300)).await;
|
||||||
|
// tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path);
|
||||||
|
return Err(anyhow::Error::msg("Cannot create watcher on local config file"));
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// [:IN-TEST]
|
||||||
|
async fn from_cli_config_reciever(
|
||||||
|
cli_oneshot: OneShotReciever<Processes>,
|
||||||
|
to_local_tx: OneShotSender<bool>
|
||||||
|
) -> Option<Processes> {
|
||||||
|
/* match awaits til channel*/
|
||||||
|
// dbg!("start of cli");
|
||||||
|
loop {
|
||||||
|
if !cli_oneshot.is_empty() {
|
||||||
|
match cli_oneshot.await {
|
||||||
|
Ok(config_from_cli) => {
|
||||||
|
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);
|
||||||
|
let _ = to_local_tx.send(true);
|
||||||
|
return Some(config_from_cli)
|
||||||
|
},
|
||||||
|
_ => return None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sleep(Duration::from_millis(300)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn export_saved_config_data_locally(
|
||||||
|
config_file_path: &PathBuf,
|
||||||
|
current_config: &Processes
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
|
||||||
|
let mut file = File::create(config_file_path)?;
|
||||||
|
file.write_all(
|
||||||
|
serde_json::to_string_pretty(current_config)?.as_bytes()
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
// Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// # Fn `load_processes`
|
/// # Fn `load_processes`
|
||||||
/// ## for reading and parsing *local* storing config
|
/// ## for reading and parsing *local* storing config
|
||||||
///
|
///
|
||||||
|
|
@ -54,14 +438,14 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
|
||||||
error!("Invalid character in config file. Config path was set to default");
|
error!("Invalid character in config file. Config path was set to default");
|
||||||
"settings.json"
|
"settings.json"
|
||||||
});
|
});
|
||||||
info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url);
|
info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url);
|
||||||
match load_processes(config_path) {
|
match load_processes(config_path) {
|
||||||
Some(local_conf) => {
|
Some(local_conf) => {
|
||||||
info!(
|
info!(
|
||||||
"Found local configuration, version - {}",
|
"Found local configuration, version - {}",
|
||||||
&local_conf.date_of_creation
|
&local_conf.date_of_creation
|
||||||
);
|
);
|
||||||
if !params.no_remote_config {
|
if !params.no_sub {
|
||||||
if let Some(remote_conf) =
|
if let Some(remote_conf) =
|
||||||
// TODO : rework with pubsub mech
|
// TODO : rework with pubsub mech
|
||||||
once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url))
|
once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url))
|
||||||
|
|
@ -85,7 +469,7 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
warn!("No local valid conf was found. Trying to pull remote one...");
|
warn!("No local valid conf was found. Trying to pull remote one...");
|
||||||
if !params.no_remote_config {
|
if !params.no_sub {
|
||||||
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url)));
|
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url)));
|
||||||
if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
|
if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
|
||||||
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
|
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
|
||||||
|
|
@ -322,7 +706,7 @@ fn restart_main_thread() -> std::io::Result<()> {
|
||||||
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
|
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
|
||||||
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
|
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
|
||||||
|
|
||||||
if params.no_sub || params.no_remote_config {
|
if params.no_sub {
|
||||||
return Err(CustomError::Fatal);
|
return Err(CustomError::Fatal);
|
||||||
}
|
}
|
||||||
if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) {
|
if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) {
|
||||||
|
|
@ -397,6 +781,9 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<Pr
|
||||||
/// *depends on* : `Processes`, `ConfigActuality`
|
/// *depends on* : `Processes`, `ConfigActuality`
|
||||||
///
|
///
|
||||||
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
||||||
|
if local.is_default() {
|
||||||
|
return ConfigActuality::Remote;
|
||||||
|
}
|
||||||
let local_date: u64 = local.date_of_creation.parse().unwrap();
|
let local_date: u64 = local.date_of_creation.parse().unwrap();
|
||||||
let remote_date: u64 = remote.date_of_creation.parse().unwrap();
|
let remote_date: u64 = remote.date_of_creation.parse().unwrap();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ enum EnvVars {
|
||||||
NoxisNoHagent,
|
NoxisNoHagent,
|
||||||
NoxisNoLogs,
|
NoxisNoLogs,
|
||||||
NoxisRefreshLogs,
|
NoxisRefreshLogs,
|
||||||
NoxisNoRemoteConfig,
|
// NoxisNoRemoteConfig,
|
||||||
NoxisNoConfigSub,
|
NoxisNoConfigSub,
|
||||||
NoxisSocketPath,
|
NoxisSocketPath,
|
||||||
NoxisLogTo,
|
NoxisLogTo,
|
||||||
|
|
@ -29,7 +29,7 @@ impl std::fmt::Display for EnvVars {
|
||||||
EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"),
|
EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"),
|
||||||
EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"),
|
EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"),
|
||||||
EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"),
|
EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"),
|
||||||
EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"),
|
// EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"),
|
||||||
EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"),
|
EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"),
|
||||||
EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"),
|
EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"),
|
||||||
EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"),
|
EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"),
|
||||||
|
|
@ -48,7 +48,7 @@ impl<'a> EnvVars {
|
||||||
EnvVars::NoxisNoHagent => "false",
|
EnvVars::NoxisNoHagent => "false",
|
||||||
EnvVars::NoxisNoLogs => "false",
|
EnvVars::NoxisNoLogs => "false",
|
||||||
EnvVars::NoxisRefreshLogs => "false",
|
EnvVars::NoxisRefreshLogs => "false",
|
||||||
EnvVars::NoxisNoRemoteConfig => "false",
|
// EnvVars::NoxisNoRemoteConfig => "false",
|
||||||
EnvVars::NoxisNoConfigSub => "false",
|
EnvVars::NoxisNoConfigSub => "false",
|
||||||
EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock",
|
EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock",
|
||||||
EnvVars::NoxisLogTo => "./",
|
EnvVars::NoxisLogTo => "./",
|
||||||
|
|
@ -77,7 +77,7 @@ impl<'a> EnvVars {
|
||||||
Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string());
|
Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string());
|
||||||
Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string());
|
Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string());
|
||||||
Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string());
|
Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string());
|
||||||
Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string());
|
// Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string());
|
||||||
Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string());
|
Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string());
|
||||||
Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap());
|
Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap());
|
||||||
Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap());
|
Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap());
|
||||||
|
|
@ -147,12 +147,6 @@ impl std::fmt::Display for MetricsPrebootParams {
|
||||||
/// noxis-rs ... --refresh-logs ...
|
/// noxis-rs ... --refresh-logs ...
|
||||||
/// ```
|
/// ```
|
||||||
///
|
///
|
||||||
/// `--no-remote-config` - to disable work with Redis as config producer
|
|
||||||
/// ### usage :
|
|
||||||
/// ``` bash
|
|
||||||
/// noxis-rs ... --no-remote-config ...
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
/// `--no-sub` - to disable Redis subscribtion mechanism
|
/// `--no-sub` - to disable Redis subscribtion mechanism
|
||||||
/// ### usage :
|
/// ### usage :
|
||||||
/// ``` bash
|
/// ``` bash
|
||||||
|
|
@ -212,17 +206,18 @@ pub struct PrebootParams {
|
||||||
help="To clear logs directory"
|
help="To clear logs directory"
|
||||||
)]
|
)]
|
||||||
pub refresh_logs : bool,
|
pub refresh_logs : bool,
|
||||||
#[arg(
|
// #[arg(
|
||||||
long = "no-remote-config",
|
// long = "no-remote-config",
|
||||||
action,
|
// action,
|
||||||
help="To disable work with remote config server",
|
// help="To disable work with remote config server",
|
||||||
conflicts_with="no_sub")]
|
// conflicts_with="no_sub")]
|
||||||
pub no_remote_config : bool,
|
// pub no_remote_config : bool,
|
||||||
#[arg(
|
#[arg(
|
||||||
long = "no-sub",
|
long = "no-sub",
|
||||||
action,
|
action,
|
||||||
help="To disable subscription mechanism",
|
help="To disable Redis subscription mechanism",
|
||||||
conflicts_with="no_remote_config")]
|
)]
|
||||||
|
// conflicts_with="no_remote_config"
|
||||||
pub no_sub : bool,
|
pub no_sub : bool,
|
||||||
|
|
||||||
// params (socket_path, log_to, remote_server_url, config)
|
// params (socket_path, log_to, remote_server_url, config)
|
||||||
|
|
@ -243,7 +238,7 @@ pub struct PrebootParams {
|
||||||
#[arg(
|
#[arg(
|
||||||
long = "remote-server-url",
|
long = "remote-server-url",
|
||||||
default_value="localhost",
|
default_value="localhost",
|
||||||
conflicts_with="no_remote_config",
|
conflicts_with="no_sub",
|
||||||
help = "To set url of remote config server using in remote config pulling mechanism"
|
help = "To set url of remote config server using in remote config pulling mechanism"
|
||||||
)]
|
)]
|
||||||
pub remote_server_url : String,
|
pub remote_server_url : String,
|
||||||
|
|
@ -288,15 +283,17 @@ impl PrebootParams {
|
||||||
// existing log dir
|
// existing log dir
|
||||||
if !self.log_to.exists() && !self.no_logs {
|
if !self.log_to.exists() && !self.no_logs {
|
||||||
eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default");
|
eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default");
|
||||||
|
self.refresh_logs = false;
|
||||||
self.log_to = PathBuf::from("./");
|
self.log_to = PathBuf::from("./");
|
||||||
// return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start"));
|
// return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start"));
|
||||||
}
|
}
|
||||||
// existing sock file
|
// existing sock file
|
||||||
if !self.config.exists() {
|
if !self.config.exists() {
|
||||||
eprintln!("Error: Invalid character in config file. Config path was set to default");
|
eprintln!("Error: Invalid character in config file. Config path was set to default");
|
||||||
let config = PathBuf::from("/etc/settings.json");
|
// TODO : ??? wtf is going with 2 paths
|
||||||
if !config.exists() && self.no_remote_config {
|
let config = PathBuf::from("/etc/enode/noxis/settings.json");
|
||||||
return Err(Error::msg("Noxis cannot run without config. Create local config or enable remote-config mechanism"));
|
if !config.exists() && self.no_sub {
|
||||||
|
return Err(Error::msg("Noxis cannot run without config. Create local config or enable pubsub mechanism"));
|
||||||
}
|
}
|
||||||
self.config = PathBuf::from("settings.json");
|
self.config = PathBuf::from("settings.json");
|
||||||
// return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start"));
|
// return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start"));
|
||||||
|
|
@ -353,20 +350,20 @@ mod preboot_unitests{
|
||||||
"runner-rs",
|
"runner-rs",
|
||||||
"--no-sub",
|
"--no-sub",
|
||||||
"--remote-server-url", "redis://127.0.0.1"
|
"--remote-server-url", "redis://127.0.0.1"
|
||||||
]).is_ok())
|
|
||||||
}
|
|
||||||
#[test]
|
|
||||||
fn parsing_config_invalid_args_noremote_nosub() {
|
|
||||||
assert!(PrebootParams::try_parse_from(vec![
|
|
||||||
"runner-rs",
|
|
||||||
"--no-remote-config", "--no-sub"
|
|
||||||
]).is_err())
|
]).is_err())
|
||||||
}
|
}
|
||||||
|
// #[test]
|
||||||
|
// fn parsing_config_invalid_args_noremote_nosub() {
|
||||||
|
// assert!(PrebootParams::try_parse_from(vec![
|
||||||
|
// "runner-rs",
|
||||||
|
// "--no-remote-config", "--no-sub"
|
||||||
|
// ]).is_err())
|
||||||
|
// }
|
||||||
#[test]
|
#[test]
|
||||||
fn parsing_config_invalid_args_noremote_remoteurl() {
|
fn parsing_config_invalid_args_noremote_remoteurl() {
|
||||||
assert!(PrebootParams::try_parse_from(vec![
|
assert!(PrebootParams::try_parse_from(vec![
|
||||||
"runner-rs",
|
"runner-rs",
|
||||||
"--no-remote-config",
|
"--no-sub",
|
||||||
"--remote-server-url", "redis://127.0.0.1"
|
"--remote-server-url", "redis://127.0.0.1"
|
||||||
]).is_err())
|
]).is_err())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ type SendersVec = Arc<Vec<Arc<mpsc::Sender<u8>>>>;
|
||||||
///
|
///
|
||||||
/// *depends on* : Sig, Signals
|
/// *depends on* : Sig, Signals
|
||||||
///
|
///
|
||||||
pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError> {
|
pub async fn set_valid_destructor(senders: SendersVec) -> anyhow::Result<()> {
|
||||||
let (mut int, mut term, mut stop) = (
|
let (mut int, mut term, mut stop) = (
|
||||||
Sig::new(Signals::Sigint, senders.clone()),
|
Sig::new(Signals::Sigint, senders.clone()),
|
||||||
Sig::new(Signals::Sigterm, senders.clone()),
|
Sig::new(Signals::Sigterm, senders.clone()),
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,114 @@
|
||||||
|
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum DependencyType {
|
||||||
|
File,
|
||||||
|
Service,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ServiceState {
|
||||||
|
Ok,
|
||||||
|
Unavailable
|
||||||
|
}
|
||||||
|
pub struct ServiceWaitConfig(u32);
|
||||||
|
|
||||||
|
impl Default for ServiceWaitConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self(5)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum FileTriggerType {
|
||||||
|
OnChange,
|
||||||
|
OnDelete,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for FileTriggerType {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
return match self {
|
||||||
|
FileTriggerType::OnChange => write!(f, "File was changed"),
|
||||||
|
FileTriggerType::OnDelete => write!(f, "File was moved or deleted"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> FileTriggerType {
|
||||||
|
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
|
||||||
|
return match self {
|
||||||
|
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
|
||||||
|
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
|
||||||
|
return match self {
|
||||||
|
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
|
||||||
|
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger.on_delete.clone())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Triggers {
|
||||||
|
File { on_change: Arc<str>, on_delete: Arc<str> },
|
||||||
|
Service {on_lost: Arc<str>, wait: u32},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Triggers {
|
||||||
|
pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers {
|
||||||
|
Triggers::File { on_change, on_delete }
|
||||||
|
}
|
||||||
|
pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers {
|
||||||
|
Triggers::Service{on_lost, wait: wait_time}
|
||||||
|
}
|
||||||
|
pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> {
|
||||||
|
if let Triggers::Service { on_lost, .. } = self {
|
||||||
|
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone())))
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct FileTriggersForController{ pub on_change: Arc<str>, pub on_delete: Arc<str> }
|
||||||
|
pub struct ServiceTriggersForController(Arc<str>);
|
||||||
|
|
||||||
|
impl std::fmt::Display for DependencyType {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
return match self {
|
||||||
|
DependencyType::File => write!(f, "File"),
|
||||||
|
DependencyType::Service => write!(f, "Service"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ProcessState {
|
||||||
|
Pending,
|
||||||
|
Holding,
|
||||||
|
Stopped,
|
||||||
|
StoppedByCli,
|
||||||
|
}
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Events {
|
||||||
|
Positive(Arc<str>),
|
||||||
|
Negative(NegativeOutcomes)
|
||||||
|
}
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum NegativeOutcomes {
|
||||||
|
FileWasChanged(Arc<str>, DependencyType, Arc<str>),
|
||||||
|
FileWasMovedOrDeleted(Arc<str>, DependencyType, Arc<str>),
|
||||||
|
ServiceIsUnreachable(Arc<str>, DependencyType, Arc<str>),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait ProcessUnit {
|
||||||
|
async fn process(&mut self);
|
||||||
|
}
|
||||||
/// # an Error enum (next will be deleted and replaced)
|
/// # an Error enum (next will be deleted and replaced)
|
||||||
pub enum CustomError {
|
pub enum CustomError {
|
||||||
Fatal,
|
Fatal,
|
||||||
|
|
@ -40,6 +147,22 @@ pub struct Processes {
|
||||||
pub processes: Vec<TrackingProcess>,
|
pub processes: Vec<TrackingProcess>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for Processes {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
date_of_creation : String::new(),
|
||||||
|
config_server : String::from("default"),
|
||||||
|
processes : Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Processes {
|
||||||
|
pub fn is_default(&self) -> bool {
|
||||||
|
self.date_of_creation.is_empty()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// # Struct for the 2nd level in json conf file
|
/// # Struct for the 2nd level in json conf file
|
||||||
/// ## for each process to contain info, such as name, path and dependencies
|
/// ## for each process to contain info, such as name, path and dependencies
|
||||||
///
|
///
|
||||||
|
|
@ -135,7 +258,7 @@ pub struct Files {
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
pub struct Services {
|
pub struct Services {
|
||||||
pub hostname: String,
|
pub hostname: String,
|
||||||
pub port: u32,
|
pub port: Option<u32>,
|
||||||
pub triggers: ServiceTriggers,
|
pub triggers: ServiceTriggers,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -159,7 +282,6 @@ pub struct Services {
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
pub struct ServiceTriggers {
|
pub struct ServiceTriggers {
|
||||||
pub wait: u32,
|
pub wait: u32,
|
||||||
pub delay: u32,
|
|
||||||
#[serde(rename = "onLost")]
|
#[serde(rename = "onLost")]
|
||||||
pub on_lost: String,
|
pub on_lost: String,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,25 +6,207 @@ pub mod services;
|
||||||
|
|
||||||
// TODO : saving current flags state
|
// TODO : saving current flags state
|
||||||
|
|
||||||
use crate::options::structs::CustomError;
|
use crate::options::structs::{CustomError, TrackingProcess, Processes};
|
||||||
use crate::options::structs::TrackingProcess;
|
// use files::create_watcher;
|
||||||
use files::create_watcher;
|
// use files::file_handler;
|
||||||
use files::file_handler;
|
// use inotify::Inotify;
|
||||||
use inotify::Inotify;
|
use log::{error, warn, info};
|
||||||
use log::{error, warn};
|
|
||||||
use prcs::{
|
use prcs::{
|
||||||
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
|
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
|
||||||
unfreeze_process,
|
unfreeze_process,
|
||||||
};
|
};
|
||||||
use services::service_handler;
|
// use services::service_handler;
|
||||||
use std::process::Command;
|
use std::process::Command;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::join;
|
// use tokio::join;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
|
// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender};
|
||||||
|
// controllers import
|
||||||
|
use prcs::v2::ProcessesController;
|
||||||
|
use files::v2::FilesController;
|
||||||
|
use services::v2::ServicesController;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
const GET_ID_CMD: &str = "hostname";
|
const GET_ID_CMD: &str = "hostname";
|
||||||
|
|
||||||
|
pub mod v2 {
|
||||||
|
use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
|
||||||
|
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum ControllerResult {
|
||||||
|
Process(Option<ProcessesController>),
|
||||||
|
File(Option<FilesController>),
|
||||||
|
Service(Option<ServicesController>),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Supervisor {
|
||||||
|
prcs : LinkedList<ProcessesController>,
|
||||||
|
files : LinkedList<FilesController>,
|
||||||
|
services : LinkedList<ServicesController>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Supervisor {
|
||||||
|
pub fn new() -> Supervisor {
|
||||||
|
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
|
||||||
|
}
|
||||||
|
pub async fn with_config(mut self, config: &Processes) -> Supervisor {
|
||||||
|
let _ = config.processes.iter()
|
||||||
|
.for_each(|prc| {
|
||||||
|
let (rx, tx) = mpsc::channel::<Events>(10);
|
||||||
|
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
|
||||||
|
if !self.prcs.contains(&temp) {
|
||||||
|
self.prcs.push_back(temp);
|
||||||
|
}
|
||||||
|
let rx = Arc::new(rx);
|
||||||
|
let proc_name: Arc<str> = Arc::from(prc.name.clone());
|
||||||
|
|
||||||
|
let _ = prc.dependencies.files.iter()
|
||||||
|
.for_each(|file| {
|
||||||
|
let mut hm = HashMap::new();
|
||||||
|
let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())};
|
||||||
|
hm.insert(proc_name.clone(), (triggers, rx.clone()));
|
||||||
|
|
||||||
|
let tempfile = FilesController::new(&file.filename.as_str(), hm)
|
||||||
|
.with_path(&file.src);
|
||||||
|
|
||||||
|
|
||||||
|
if let Ok(file) = tempfile {
|
||||||
|
if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) {
|
||||||
|
current_file.add_event(file);
|
||||||
|
} else {
|
||||||
|
self.files.push_back(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// servs
|
||||||
|
let _ = prc.dependencies.services.iter()
|
||||||
|
.for_each(|serv| {
|
||||||
|
let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref());
|
||||||
|
// preparations
|
||||||
|
let rx = rx.clone();
|
||||||
|
let serv_cont = ServicesController::new().with_access_name(
|
||||||
|
&serv.hostname,
|
||||||
|
&access_url
|
||||||
|
);
|
||||||
|
// triggers
|
||||||
|
let arc: Arc<str> = Arc::from(serv.triggers.on_lost.clone());
|
||||||
|
let triggers = Triggers::new_service(arc, serv.triggers.wait);
|
||||||
|
|
||||||
|
if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) {
|
||||||
|
proc.add_process(&prc.name, triggers, rx);
|
||||||
|
} else {
|
||||||
|
// vecdeque for queue
|
||||||
|
let mut vec: VecDeque<Arc<str>> = VecDeque::new();
|
||||||
|
vec.push_back(proc_name.clone());
|
||||||
|
// connection_queue
|
||||||
|
let mut connection_queue: BTreeMap<u32, VecDeque<Arc<str>>> = BTreeMap::new();
|
||||||
|
connection_queue.insert(serv.triggers.wait, vec);
|
||||||
|
// event_reg
|
||||||
|
let mut hm = HashMap::new();
|
||||||
|
hm.insert(proc_name.clone(), (triggers, rx));
|
||||||
|
|
||||||
|
let serv_cont = serv_cont.with_params(connection_queue, hm);
|
||||||
|
self.services.push_back(serv_cont);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
self
|
||||||
|
}
|
||||||
|
pub fn get_stats(&self) -> String {
|
||||||
|
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ProcessUnit for Supervisor {
|
||||||
|
async fn process(&mut self) {
|
||||||
|
info!("Initializing monitoring ...");
|
||||||
|
loop {
|
||||||
|
// dbg!(&self);
|
||||||
|
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
|
||||||
|
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
|
||||||
|
// let res = tokio::join!(prc.process(), file.process(), serv.process());
|
||||||
|
if let Some(mut val) = self.prcs.pop_front() {
|
||||||
|
tasks.push(
|
||||||
|
tokio::spawn( async move {
|
||||||
|
val.process().await;
|
||||||
|
ControllerResult::Process(Some(val))
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(mut val) = self.files.pop_front() {
|
||||||
|
tasks.push(
|
||||||
|
tokio::spawn( async move {
|
||||||
|
val.process().await;
|
||||||
|
ControllerResult::File(Some(val))
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(mut val) = self.services.pop_front() {
|
||||||
|
tasks.push(
|
||||||
|
tokio::spawn( async move {
|
||||||
|
val.process().await;
|
||||||
|
ControllerResult::Service(Some(val))
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
for task in tasks {
|
||||||
|
match task.await {
|
||||||
|
Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val),
|
||||||
|
Ok(ControllerResult::File(Some(val))) => self.files.push_back(val),
|
||||||
|
Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val),
|
||||||
|
Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."),
|
||||||
|
_ => { /* DEAD END (CAN NOT BE EXECUTED) */},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// spawn tasks
|
||||||
|
// spawn prc
|
||||||
|
// spawn files
|
||||||
|
// spawn services
|
||||||
|
// ## for ... i.await in loop
|
||||||
|
pub async fn init_monitoring(
|
||||||
|
config: Processes
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut supervisor = Supervisor::new().with_config(&config).await;
|
||||||
|
info!("Monitoring: {} ", &supervisor.get_stats());
|
||||||
|
supervisor.process().await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// async fn generate_controllers<'a>(config: Processes) -> (HashSet<ProcessesController<'a>>, HashSet<FilesController<'a>>, HashSet<ServicesController<'a>>) {
|
||||||
|
// let mut prcs: HashSet<ProcessesController<'a>> = HashSet::new();
|
||||||
|
// let mut files: HashSet<FilesController<'a>> = HashSet::new();
|
||||||
|
// let mut services: HashSet<ServicesController<'a>> = HashSet::new();
|
||||||
|
// for prc in config.processes {
|
||||||
|
// let (rx, tx) = mpsc::channel::<Events<'a>>(10);
|
||||||
|
// // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path);
|
||||||
|
// let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path);
|
||||||
|
// let a = new_prc.process().await;
|
||||||
|
|
||||||
|
// }
|
||||||
|
// (prcs, files, services)
|
||||||
|
// }
|
||||||
|
// spawn prc check with semaphore check
|
||||||
|
async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) }
|
||||||
|
|
||||||
|
// spawn file check with semaphore check
|
||||||
|
async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) }
|
||||||
|
|
||||||
|
// spawn service check with semaphore check
|
||||||
|
async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) }
|
||||||
|
}
|
||||||
|
|
||||||
/// # Fn `run_daemons`
|
/// # Fn `run_daemons`
|
||||||
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
|
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
|
||||||
///
|
///
|
||||||
|
|
@ -40,37 +222,37 @@ const GET_ID_CMD: &str = "hostname";
|
||||||
///
|
///
|
||||||
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
|
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
|
||||||
///
|
///
|
||||||
pub async fn run_daemons(
|
// pub async fn run_daemons(
|
||||||
proc: Arc<TrackingProcess>,
|
// proc: Arc<TrackingProcess>,
|
||||||
tx: Arc<mpsc::Sender<u8>>,
|
// tx: Arc<mpsc::Sender<u8>>,
|
||||||
rx: &mut mpsc::Receiver<u8>,
|
// rx: &mut mpsc::Receiver<u8>,
|
||||||
) {
|
// ) {
|
||||||
// creating watchers + ---buffers---
|
// // creating watchers + ---buffers---
|
||||||
let mut watchers: Vec<Inotify> = vec![];
|
// let mut watchers: Vec<Inotify> = vec![];
|
||||||
for file in proc.dependencies.files.clone().into_iter() {
|
// for file in proc.dependencies.files.clone().into_iter() {
|
||||||
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
||||||
watchers.push(watcher);
|
// watchers.push(watcher);
|
||||||
} else {
|
// } else {
|
||||||
let _ = tx.send(121).await;
|
// let _ = tx.send(121).await;
|
||||||
}
|
// }
|
||||||
// watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
|
// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
|
||||||
}
|
// }
|
||||||
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
|
// let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
|
||||||
Arc::new(tokio::sync::Mutex::new(watchers));
|
// Arc::new(tokio::sync::Mutex::new(watchers));
|
||||||
|
|
||||||
loop {
|
// loop {
|
||||||
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
||||||
tokio::select! {
|
// tokio::select! {
|
||||||
_ = run_hand => continue,
|
// _ = run_hand => continue,
|
||||||
_val = rx.recv() => {
|
// _val = rx.recv() => {
|
||||||
if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
|
// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
},
|
// },
|
||||||
}
|
// }
|
||||||
tokio::task::yield_now().await;
|
// tokio::task::yield_now().await;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
|
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
|
||||||
match val {
|
match val {
|
||||||
|
|
@ -133,7 +315,6 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
|
||||||
},
|
},
|
||||||
// // 9 - File-dependency change -> staying (after check)
|
// // 9 - File-dependency change -> staying (after check)
|
||||||
9 => {
|
9 => {
|
||||||
// no need to trash logs
|
|
||||||
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
|
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
},
|
},
|
||||||
|
|
@ -201,36 +382,36 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
|
||||||
///
|
///
|
||||||
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
|
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
|
||||||
///
|
///
|
||||||
pub async fn running_handler(
|
// pub async fn running_handler(
|
||||||
prc: Arc<TrackingProcess>,
|
// prc: Arc<TrackingProcess>,
|
||||||
tx: Arc<mpsc::Sender<u8>>,
|
// tx: Arc<mpsc::Sender<u8>>,
|
||||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
// watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
||||||
) {
|
// ) {
|
||||||
// services and files check (once)
|
// // services and files check (once)
|
||||||
let files_check = file_handler(
|
// let files_check = file_handler(
|
||||||
&prc.name,
|
// &prc.name,
|
||||||
&prc.dependencies.files,
|
// &prc.dependencies.files,
|
||||||
tx.clone(),
|
// tx.clone(),
|
||||||
watchers.clone(),
|
// watchers.clone(),
|
||||||
);
|
// );
|
||||||
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
|
// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
|
||||||
|
|
||||||
let res = join!(files_check, services_check);
|
// let res = join!(files_check, services_check);
|
||||||
// if inactive -> spawn checks -> active is true
|
// // if inactive -> spawn checks -> active is true
|
||||||
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
||||||
if start_process(&prc.name, &prc.path).await.is_err() {
|
// if start_process(&prc.name, &prc.path).await.is_err() {
|
||||||
tx.send(3).await.unwrap();
|
// tx.send(3).await.unwrap();
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
// if frozen -> spawn checks -> unfreeze is true
|
// // if frozen -> spawn checks -> unfreeze is true
|
||||||
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
||||||
tx.send(10).await.unwrap();
|
// tx.send(10).await.unwrap();
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
// tokio::time::sleep(Duration::from_millis(100)).await;
|
// // tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
tokio::task::yield_now().await;
|
// tokio::task::yield_now().await;
|
||||||
}
|
// }
|
||||||
|
|
||||||
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
|
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
|
||||||
/// # Fn `get_container_id`
|
/// # Fn `get_container_id`
|
||||||
|
|
|
||||||
|
|
@ -1,183 +1,321 @@
|
||||||
use crate::options::structs::{CustomError, Files};
|
use crate::options::structs::{CustomError, Files};
|
||||||
use super::prcs::{is_active, is_frozen};
|
use super::prcs::{is_active, is_frozen};
|
||||||
use inotify::{EventMask, Inotify, WatchMask};
|
use inotify::{EventMask, Inotify, WatchMask};
|
||||||
use std::borrow::BorrowMut;
|
use std::borrow::BorrowMut;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::time::Duration;
|
use tokio::sync::mpsc::Sender as Sender;
|
||||||
|
use tokio::time::Duration;
|
||||||
|
use crate::options::structs::Events;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
/// # Fn `create_watcher`
|
pub mod v2 {
|
||||||
/// ## for creating watcher on file's delete | update events
|
use log::{error, info, warn};
|
||||||
///
|
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
|
||||||
/// *input* : `&str`, `&str`
|
use super::*;
|
||||||
///
|
use std::{collections::HashMap, path::Path};
|
||||||
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
|
|
||||||
///
|
|
||||||
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
|
|
||||||
///
|
|
||||||
/// *managing* : current file's name: &str, path in local storage to current file: &str
|
|
||||||
///
|
|
||||||
/// *depends on* : -
|
|
||||||
///
|
|
||||||
pub async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {
|
|
||||||
let src = format!("{}{}", path, filename);
|
|
||||||
let inotify: Inotify = Inotify::init()?;
|
|
||||||
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
|
|
||||||
Ok(inotify)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// # Fn `create_watcher`
|
type MpscSender = Arc<Sender<Events>>;
|
||||||
/// ## for managing processes by checking dep files' states
|
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
|
||||||
///
|
|
||||||
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
#[derive(Debug)]
|
||||||
///
|
enum FileState {
|
||||||
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
|
Ok,
|
||||||
///
|
NotFound,
|
||||||
/// *initiator* : fn `utils::running_handler`
|
}
|
||||||
///
|
|
||||||
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
#[derive(Debug)]
|
||||||
///
|
pub struct FilesController {
|
||||||
/// *depends on* : Files
|
name : Arc<str>,
|
||||||
///
|
path : String,
|
||||||
pub async fn file_handler(
|
code_name : Arc<str>,
|
||||||
name: &str,
|
state : FileState,
|
||||||
files: &[Files],
|
watcher : Option<Inotify>,
|
||||||
tx: Arc<mpsc::Sender<u8>>,
|
triggers : EventHandlers,
|
||||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
}
|
||||||
) -> Result<(), CustomError> {
|
|
||||||
for (i, file) in files.iter().enumerate() {
|
impl PartialEq for FilesController {
|
||||||
// let src = format!("{}{}", file.src, file.filename);
|
fn eq(&self, other: &Self) -> bool {
|
||||||
if check_file(&file.filename, &file.src).await.is_err() {
|
self.code_name == other.code_name
|
||||||
if !is_active(name).await || is_frozen(name).await {
|
|
||||||
return Err(CustomError::Fatal);
|
|
||||||
}
|
}
|
||||||
match file.triggers.on_delete.as_str() {
|
}
|
||||||
"stay" => {
|
|
||||||
tx.send(9).await.unwrap();
|
impl FilesController {
|
||||||
continue;
|
pub fn new(name: &str, triggers: EventHandlers) -> FilesController {
|
||||||
}
|
let name: Arc<str> = Arc::from(name);
|
||||||
"stop" => {
|
Self {
|
||||||
if is_active(name).await {
|
name : name.clone(),
|
||||||
tx.send(1).await.unwrap();
|
path : String::new(),
|
||||||
}
|
state : FileState::Ok,
|
||||||
return Err(CustomError::Fatal);
|
watcher : None,
|
||||||
}
|
triggers,
|
||||||
"hold" => {
|
code_name : name.clone(),
|
||||||
if is_active(name).await {
|
|
||||||
tx.send(2).await.unwrap();
|
|
||||||
return Err(CustomError::Fatal);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
||||||
tx.send(101).await.unwrap();
|
|
||||||
return Err(CustomError::Fatal);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if is_active(name).await && !is_frozen(name).await {
|
pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController> {
|
||||||
let watchers = watchers.clone();
|
self.path = path.as_ref().to_string_lossy().into_owned();
|
||||||
// println!("mutex: {:?}", watchers);
|
self.watcher = {
|
||||||
let mut buffer = [0; 128];
|
match create_watcher(&self.name, &self.path) {
|
||||||
let mut mutex_guard = watchers.lock().await;
|
Ok(val) => Some(val),
|
||||||
if let Some(notify) = mutex_guard.get_mut(i) {
|
Err(er) => {
|
||||||
let events = notify.read_events(&mut buffer);
|
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
|
||||||
// println!("{:?}", events);
|
return Err(er)
|
||||||
if events.is_ok() {
|
|
||||||
let events: Vec<EventMask> = events
|
|
||||||
.unwrap()
|
|
||||||
.map(|mask| mask.mask)
|
|
||||||
.filter(|mask| {
|
|
||||||
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
for event in events {
|
|
||||||
if let EventMask::DELETE_SELF = event {
|
|
||||||
// ! warning (DELETE_SELF event) !
|
|
||||||
// println!("! warning (DELETE_SELF event) !");
|
|
||||||
// * watcher recreation after dealing with file recreation mechanism in text editors
|
|
||||||
let mutex = notify.borrow_mut();
|
|
||||||
|
|
||||||
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
|
||||||
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
|
||||||
*mutex = watcher;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
match file.triggers.on_change.as_str() {
|
}
|
||||||
"stop" => {
|
};
|
||||||
let _ = tx.send(7).await;
|
self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name));
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
pub fn add_event(&mut self, file_controller : FilesController) {
|
||||||
|
for (k, v) in file_controller.triggers {
|
||||||
|
self.triggers.entry(k).or_insert(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
|
||||||
|
for (prc_name, (triggers, channel)) in &self.triggers {
|
||||||
|
let msg = match &trigger_type {
|
||||||
|
None => {
|
||||||
|
Events::Positive(self.code_name.clone())
|
||||||
|
},
|
||||||
|
Some(event) => {
|
||||||
|
info!("Event on file {} ({}) : {}. Notifying `{}` ...", &self.name, &self.path, event, &prc_name);
|
||||||
|
event.event_from_file_trigger_controller(self.code_name.clone(), &triggers)
|
||||||
|
},
|
||||||
|
};
|
||||||
|
let _ = channel.send(msg).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[async_trait]
|
||||||
|
impl ProcessUnit for FilesController {
|
||||||
|
async fn process(&mut self) {
|
||||||
|
// polling file check
|
||||||
|
// 1) existing check
|
||||||
|
// dbg!(&self);
|
||||||
|
if let Ok(_) = check_file(&self.name, &self.path).await {
|
||||||
|
if let FileState::NotFound = self.state {
|
||||||
|
info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name);
|
||||||
|
self.state = FileState::Ok;
|
||||||
|
// reseting negative outcome in prc
|
||||||
|
self.trigger_on(None).await;
|
||||||
|
}
|
||||||
|
match &mut self.watcher {
|
||||||
|
Some(notify) => {
|
||||||
|
let mut buffer = [0; 1024];
|
||||||
|
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
|
||||||
|
// notif_events.into_iter().for_each(|mask| {dbg!(&mask.mask);});
|
||||||
|
// todo!();
|
||||||
|
if let (recreate_watcher, true) = (
|
||||||
|
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
|
||||||
|
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
|
||||||
|
) {
|
||||||
|
warn!("File {} ({}) was changed", self.name, &self.path);
|
||||||
|
if recreate_watcher {
|
||||||
|
self.watcher = match create_watcher(&self.name, &self.path) {
|
||||||
|
Ok(notifier) => Some(notifier),
|
||||||
|
Err(er) => {
|
||||||
|
error!("Failed to recreate watcher for {} ({}) due to {}",
|
||||||
|
self.name,
|
||||||
|
&self.path,
|
||||||
|
er
|
||||||
|
);
|
||||||
|
None
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.trigger_on(Some(FileTriggerType::OnChange)).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
"restart" => {
|
},
|
||||||
let _ = tx.send(8).await;
|
None => { /* DEAD END */},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if let FileState::Ok = self.state {
|
||||||
|
warn!("File {} ({}) was not found in determined scope", self.name, &self.path);
|
||||||
|
self.state = FileState::NotFound;
|
||||||
|
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
self.trigger_on(None).await;
|
||||||
|
// 2) change check
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # Fn `create_watcher`
|
||||||
|
/// ## for creating watcher on file's delete | update events
|
||||||
|
///
|
||||||
|
/// *input* : `&str`, `&str`
|
||||||
|
///
|
||||||
|
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
|
||||||
|
///
|
||||||
|
/// *managing* : current file's name: &str, path in local storage to current file: &str
|
||||||
|
///
|
||||||
|
/// *depends on* : -
|
||||||
|
///
|
||||||
|
pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
|
||||||
|
let src = format!("{}{}", path, filename);
|
||||||
|
let inotify: Inotify = Inotify::init()?;
|
||||||
|
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
|
||||||
|
Ok(inotify)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # Fn `create_watcher`
|
||||||
|
/// ## for managing processes by checking dep files' states
|
||||||
|
///
|
||||||
|
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||||
|
///
|
||||||
|
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `utils::running_handler`
|
||||||
|
///
|
||||||
|
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||||
|
///
|
||||||
|
/// *depends on* : Files
|
||||||
|
///
|
||||||
|
pub async fn file_handler(
|
||||||
|
name: &str,
|
||||||
|
files: &[Files],
|
||||||
|
tx: Arc<mpsc::Sender<u8>>,
|
||||||
|
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
for (i, file) in files.iter().enumerate() {
|
||||||
|
// let src = format!("{}{}", file.src, file.filename);
|
||||||
|
if check_file(&file.filename, &file.src).await.is_err() {
|
||||||
|
if !is_active(name).await || is_frozen(name).await {
|
||||||
|
return Err(anyhow::Error::msg("Process is frozen or stopped"));
|
||||||
|
}
|
||||||
|
match file.triggers.on_delete.as_str() {
|
||||||
|
"stay" => {
|
||||||
|
tx.send(9).await.unwrap();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
"stop" => {
|
||||||
|
if is_active(name).await {
|
||||||
|
tx.send(1).await.unwrap();
|
||||||
|
}
|
||||||
|
return Err(anyhow::Error::msg("Process was stopped"));
|
||||||
|
}
|
||||||
|
"hold" => {
|
||||||
|
if is_active(name).await {
|
||||||
|
tx.send(2).await.unwrap();
|
||||||
|
return Err(anyhow::Error::msg("Process was frozen"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||||
|
tx.send(101).await.unwrap();
|
||||||
|
return Err(anyhow::Error::msg("Impermissible character or word in file trigger"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if is_active(name).await && !is_frozen(name).await {
|
||||||
|
let watchers = watchers.clone();
|
||||||
|
// println!("mutex: {:?}", watchers);
|
||||||
|
let mut buffer = [0; 128];
|
||||||
|
let mut mutex_guard = watchers.lock().await;
|
||||||
|
if let Some(notify) = mutex_guard.get_mut(i) {
|
||||||
|
let events = notify.read_events(&mut buffer);
|
||||||
|
// println!("{:?}", events);
|
||||||
|
if events.is_ok() {
|
||||||
|
let events: Vec<EventMask> = events
|
||||||
|
.unwrap()
|
||||||
|
.map(|mask| mask.mask)
|
||||||
|
.filter(|mask| {
|
||||||
|
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
for event in events {
|
||||||
|
if let EventMask::DELETE_SELF = event {
|
||||||
|
// ! warning (DELETE_SELF event) !
|
||||||
|
// println!("! warning (DELETE_SELF event) !");
|
||||||
|
// * watcher recreation after dealing with file recreation mechanism in text editors
|
||||||
|
let mutex = notify.borrow_mut();
|
||||||
|
|
||||||
|
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
||||||
|
if let Ok(watcher) = create_watcher(&file.filename, &file.src) {
|
||||||
|
*mutex = watcher;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
"stay" => {
|
match file.triggers.on_change.as_str() {
|
||||||
let _ = tx.send(9).await;
|
"stop" => {
|
||||||
}
|
let _ = tx.send(7).await;
|
||||||
_ => {
|
}
|
||||||
let _ = tx.send(101).await;
|
"restart" => {
|
||||||
|
let _ = tx.send(8).await;
|
||||||
|
}
|
||||||
|
"stay" => {
|
||||||
|
let _ = tx.send(9).await;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
let _ = tx.send(101).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
tokio::task::yield_now().await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// # Fn `check_file`
|
/// # Fn `check_file`
|
||||||
/// ## for checking existance of current file
|
/// ## for checking existance of current file
|
||||||
///
|
///
|
||||||
/// *input* : `&str`, `&str`
|
/// *input* : `&str`, `&str`
|
||||||
///
|
///
|
||||||
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
|
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
|
||||||
///
|
///
|
||||||
/// *initiator* : fn `file_handler`
|
/// *initiator* : fn `file_handler`
|
||||||
///
|
///
|
||||||
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
|
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
|
||||||
///
|
///
|
||||||
/// *depends on* : network activity
|
/// *depends on* : network activity
|
||||||
///
|
///
|
||||||
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
|
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
|
||||||
let arc_name = Arc::new(filename.to_string());
|
let arc_name = Arc::new(filename.to_string());
|
||||||
let arc_path = Arc::new(path.to_string());
|
let arc_path = Arc::new(path.to_string());
|
||||||
tokio::task::spawn_blocking(move || {
|
tokio::task::spawn_blocking(move || {
|
||||||
let file_concat = format!("{}{}", arc_path, arc_name);
|
let file_concat = format!("{}{}", arc_path, arc_name);
|
||||||
let path = Path::new(&file_concat);
|
let path = Path::new(&file_concat);
|
||||||
if path.exists() {
|
if path.exists() {
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(CustomError::Fatal)
|
Err(CustomError::Fatal)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
panic!("Corrupted while file check process");
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod files_unittests {
|
||||||
|
use super::*;
|
||||||
|
#[tokio::test]
|
||||||
|
async fn try_to_create_watcher() {
|
||||||
|
let res = create_watcher("dep-file", "./tests/examples/");
|
||||||
|
assert!(res.is_ok());
|
||||||
|
}
|
||||||
|
#[tokio::test]
|
||||||
|
async fn try_to_create_invalid_watcher() {
|
||||||
|
let res = create_watcher("invalid-file", "/path/to/the/no/dir");
|
||||||
|
assert!(res.is_err());
|
||||||
|
}
|
||||||
|
#[tokio::test]
|
||||||
|
async fn check_existing_file() {
|
||||||
|
let res = check_file("dep-file", "./tests/examples/").await;
|
||||||
|
assert!(res.is_ok());
|
||||||
|
}
|
||||||
|
#[tokio::test]
|
||||||
|
async fn check_non_existing_file() {
|
||||||
|
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
|
||||||
|
assert!(res.is_err());
|
||||||
}
|
}
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap_or_else(|_| {
|
|
||||||
panic!("Corrupted while file check process");
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod files_unittests {
|
|
||||||
use super::*;
|
|
||||||
#[tokio::test]
|
|
||||||
async fn try_to_create_watcher() {
|
|
||||||
let res = create_watcher("dep-file", "./tests/examples/").await;
|
|
||||||
assert!(res.is_ok());
|
|
||||||
}
|
}
|
||||||
#[tokio::test]
|
|
||||||
async fn try_to_create_invalid_watcher() {
|
|
||||||
let res = create_watcher("invalid-file", "/path/to/the/no/dir").await;
|
|
||||||
assert!(res.is_err());
|
|
||||||
}
|
|
||||||
#[tokio::test]
|
|
||||||
async fn check_existing_file() {
|
|
||||||
let res = check_file("dep-file", "./tests/examples/").await;
|
|
||||||
assert!(res.is_ok());
|
|
||||||
}
|
|
||||||
#[tokio::test]
|
|
||||||
async fn check_non_existing_file() {
|
|
||||||
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
|
|
||||||
assert!(res.is_err());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,132 @@
|
||||||
use crate::options::structs::CustomError;
|
|
||||||
use log::{error, warn};
|
use log::{error, warn};
|
||||||
use std::process::{Command, Output};
|
use std::process::{Command, Output};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
|
use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit};
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use tokio::sync::mpsc::Receiver as MpscReciever;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
pub mod v2 {
|
||||||
|
use log::info;
|
||||||
|
use crate::options::structs::DependencyType;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ProcessesController {
|
||||||
|
name: Arc<str>,
|
||||||
|
bin: String,
|
||||||
|
// obj: Arc<TrackingProcess>,
|
||||||
|
state: ProcessState,
|
||||||
|
event_reader: MpscReciever<Events>,
|
||||||
|
negative_events: HashSet<Arc<str>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for ProcessesController {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.bin == other.bin
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProcessesController {
|
||||||
|
pub fn new(name: &str, event_reader: MpscReciever<Events>) -> ProcessesController {
|
||||||
|
ProcessesController {
|
||||||
|
name : Arc::from(name),
|
||||||
|
bin : String::new(),
|
||||||
|
state : ProcessState::Stopped,
|
||||||
|
event_reader,
|
||||||
|
negative_events : HashSet::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController {
|
||||||
|
self.bin = bin.as_ref().to_string_lossy().into_owned();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) {
|
||||||
|
match trigger {
|
||||||
|
"stay" => {
|
||||||
|
info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name);
|
||||||
|
},
|
||||||
|
"stop" => {
|
||||||
|
if is_active(&self.name).await {
|
||||||
|
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
|
||||||
|
terminate_process(&self.name).await;
|
||||||
|
self.state = ProcessState::Stopped;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"hold" => {
|
||||||
|
if !is_frozen(&self.name).await {
|
||||||
|
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
|
||||||
|
freeze_process(&self.name).await;
|
||||||
|
self.state = ProcessState::Holding;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"restart" => {
|
||||||
|
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
|
||||||
|
let _ = restart_process(&self.name, &self.bin).await;
|
||||||
|
},
|
||||||
|
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_micros(100)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ProcessUnit for ProcessesController {
|
||||||
|
async fn process(&mut self) {
|
||||||
|
if self.negative_events.len() == 0 {
|
||||||
|
match self.state {
|
||||||
|
ProcessState::Holding => {
|
||||||
|
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
|
||||||
|
if let Err(er) = unfreeze_process(&self.name).await {
|
||||||
|
error!("Cannot unfreeze process {} : {}", self.name, er);
|
||||||
|
} else {
|
||||||
|
self.state = ProcessState::Pending;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ProcessState::Stopped => {
|
||||||
|
info!("No negative dependecies events on {} process. Starting ...", self.name);
|
||||||
|
if let Err(er) = start_process(&self.name, &self.bin).await {
|
||||||
|
error!("Cannot start process {} : {}", self.name, er);
|
||||||
|
} else {
|
||||||
|
self.state = ProcessState::Pending;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while let Ok(event) = self.event_reader.try_recv() {
|
||||||
|
match event {
|
||||||
|
Events::Positive(target) => {
|
||||||
|
if self.negative_events.contains(&target) {
|
||||||
|
self.negative_events.remove(&target);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Events::Negative(event) => {
|
||||||
|
match event {
|
||||||
|
NegativeOutcomes::FileWasChanged(target, dep_type, trigger) |
|
||||||
|
NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) |
|
||||||
|
NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
|
||||||
|
if !self.negative_events.contains(&target) {
|
||||||
|
self.negative_events.insert(target.clone());
|
||||||
|
|
||||||
|
self.trigger_on(
|
||||||
|
&target,
|
||||||
|
&trigger,
|
||||||
|
dep_type
|
||||||
|
).await;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// # Fn `get_pid`
|
/// # Fn `get_pid`
|
||||||
/// ## for initializing process of unstoppable grubbing metrics.
|
/// ## for initializing process of unstoppable grubbing metrics.
|
||||||
|
|
@ -162,14 +286,11 @@ pub async fn freeze_process(name: &str) {
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
pub async fn unfreeze_process(name: &str) {
|
pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> {
|
||||||
let _ = Command::new("pkill")
|
let _ = Command::new("pkill")
|
||||||
.args(["-CONT", name])
|
.args(["-CONT", name])
|
||||||
.output()
|
.output()?;
|
||||||
.unwrap_or_else(|_| {
|
Ok(())
|
||||||
error!("Failed to unfreeze process");
|
|
||||||
std::process::exit(101);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # Fn `restart_process`
|
/// # Fn `restart_process`
|
||||||
|
|
@ -185,7 +306,7 @@ pub async fn unfreeze_process(name: &str) {
|
||||||
///
|
///
|
||||||
/// *depends on* : fn `start_process`, fn `terminate_process`
|
/// *depends on* : fn `start_process`, fn `terminate_process`
|
||||||
///
|
///
|
||||||
pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> {
|
pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> {
|
||||||
terminate_process(name).await;
|
terminate_process(name).await;
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
start_process(name, path).await
|
start_process(name, path).await
|
||||||
|
|
@ -204,7 +325,7 @@ pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError>
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
|
pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> {
|
||||||
// let runsh = format!("{} {}", "exec", path);
|
// let runsh = format!("{} {}", "exec", path);
|
||||||
let mut command = Command::new(path);
|
let mut command = Command::new(path);
|
||||||
// command.arg(path);
|
// command.arg(path);
|
||||||
|
|
@ -215,8 +336,7 @@ pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(er) => {
|
Err(er) => {
|
||||||
println!("{:?}", er);
|
Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er)))
|
||||||
Err(CustomError::Fatal)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,193 @@
|
||||||
use crate::options::structs::{CustomError, Services};
|
use crate::options::structs::CustomError;
|
||||||
use super::prcs::{is_active, is_frozen};
|
|
||||||
use log::{error, warn};
|
use log::{error, warn};
|
||||||
use std::net::{TcpStream, ToSocketAddrs};
|
use std::net::{TcpStream, ToSocketAddrs};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::time::Duration;
|
||||||
use tokio::time::{Duration, Instant};
|
use tokio::sync::mpsc::Sender as Sender;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
pub mod v2 {
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use std::collections::{HashMap, BTreeMap, VecDeque};
|
||||||
|
|
||||||
|
type MpscSender = Arc<Sender<Events>>;
|
||||||
|
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
|
||||||
|
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
|
||||||
|
// type wrapper for service wait queue
|
||||||
|
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ServicesController {
|
||||||
|
// i.e. yandex.ru
|
||||||
|
#[allow(unused)]
|
||||||
|
name : String,
|
||||||
|
// i.e. yandex.ru:443
|
||||||
|
access_url : Arc<str>,
|
||||||
|
// "OK" or "Unavailable"
|
||||||
|
state: ServiceState,
|
||||||
|
// btree map with key as max wait time and it's key to hashmap
|
||||||
|
config: ConnectionQueue,
|
||||||
|
// Map of processes with their (trigger and mpsc sender)
|
||||||
|
event_registrator : EventHandlers,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for ServicesController {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.access_url == other.access_url
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServicesController {
|
||||||
|
pub fn new() -> ServicesController {
|
||||||
|
ServicesController {
|
||||||
|
name : String::new(),
|
||||||
|
access_url : Arc::from(String::new()),
|
||||||
|
state : ServiceState::Unavailable,
|
||||||
|
config: ConnectionQueue::new(),
|
||||||
|
event_registrator : EventHandlers::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn with_access_name(
|
||||||
|
mut self,
|
||||||
|
hostname: &str,
|
||||||
|
access_url: &str,
|
||||||
|
) -> ServicesController {
|
||||||
|
self.name = hostname.to_string();
|
||||||
|
self.access_url = Arc::from(access_url);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_params(
|
||||||
|
mut self,
|
||||||
|
conn_queue: ConnectionQueue,
|
||||||
|
event_reg: EventHandlers,
|
||||||
|
) -> ServicesController {
|
||||||
|
self.config = conn_queue;
|
||||||
|
self.event_registrator = event_reg;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
|
||||||
|
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
|
||||||
|
}
|
||||||
|
pub fn add_process(
|
||||||
|
&mut self,
|
||||||
|
proc_name: &str,
|
||||||
|
trigger: Triggers,
|
||||||
|
sender: MpscSender,
|
||||||
|
) {
|
||||||
|
let proc_name: Arc<str> = Arc::from(proc_name);
|
||||||
|
// queue add
|
||||||
|
if let Triggers::Service { wait, .. } = trigger {
|
||||||
|
self.config.entry(wait)
|
||||||
|
.and_modify(|el| el.push_back(proc_name.clone()))
|
||||||
|
.or_insert({
|
||||||
|
let mut temp = VecDeque::new();
|
||||||
|
temp.push_back(proc_name.clone());
|
||||||
|
temp
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// event add
|
||||||
|
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
|
||||||
|
}
|
||||||
|
async fn check_state(&self) -> anyhow::Result<()> {
|
||||||
|
let mut addrs = self.access_url.to_socket_addrs()?;
|
||||||
|
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
|
||||||
|
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn trigger_on(&mut self) {
|
||||||
|
match self.state {
|
||||||
|
ServiceState::Ok => {
|
||||||
|
let _ = self.event_registrator
|
||||||
|
.iter()
|
||||||
|
.map(|(_, (_, el))| async {
|
||||||
|
let _ = el.send(Events::Positive(self.access_url.clone())).await;
|
||||||
|
});
|
||||||
|
},
|
||||||
|
ServiceState::Unavailable => {
|
||||||
|
// looped check and notifying
|
||||||
|
self.looped_check().await;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn looped_check(self: &mut Self) {
|
||||||
|
let longest = self.config.last_entry().unwrap();
|
||||||
|
let longest = longest.key();
|
||||||
|
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
|
||||||
|
let timer = tokio::time::Instant::now();
|
||||||
|
let mut attempt: u32 = 1;
|
||||||
|
let access_url = Arc::new(self.access_url.clone());
|
||||||
|
// let event_registrator = &mut self.event_registrator;
|
||||||
|
|
||||||
|
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
|
||||||
|
// let access_url = access_url.clone();
|
||||||
|
loop {
|
||||||
|
interapter.tick().await;
|
||||||
|
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt);
|
||||||
|
attempt += 1;
|
||||||
|
|
||||||
|
let state_check_result = self.check_state().await;
|
||||||
|
|
||||||
|
if state_check_result.is_ok() {
|
||||||
|
info!("Connection to {} is `OK` now", &access_url);
|
||||||
|
self.state = ServiceState::Ok;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
let now = timer.elapsed();
|
||||||
|
let iterator = self.config.iter()
|
||||||
|
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
|
||||||
|
.flat_map(|(_, a)| a.iter().cloned())
|
||||||
|
.collect::<VecDeque<Arc<str>>>();
|
||||||
|
|
||||||
|
for name in iterator {
|
||||||
|
let proc_name = name.to_string();
|
||||||
|
info!("Trying to notify process `{}` ...", &proc_name);
|
||||||
|
let sender_opt = self.event_registrator.get(&name)
|
||||||
|
.map(|(trigger, sender)|
|
||||||
|
(trigger.to_service_negative_event(name.clone()), sender)
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some((tr, tx)) = sender_opt {
|
||||||
|
let _ = tx.send(tr.unwrap()).await;
|
||||||
|
} else {
|
||||||
|
error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).await {
|
||||||
|
info!("Timeout of establishing connection to {}. ", &access_url);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[async_trait]
|
||||||
|
impl ProcessUnit for ServicesController {
|
||||||
|
async fn process(&mut self) {
|
||||||
|
// check_service(hostname, port)
|
||||||
|
let current_state = self.check_state().await;
|
||||||
|
match (&self.state, current_state) {
|
||||||
|
(ServiceState::Unavailable, Ok(_)) => {
|
||||||
|
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
||||||
|
self.state = ServiceState::Ok;
|
||||||
|
self.trigger_on().await;
|
||||||
|
},
|
||||||
|
(ServiceState::Ok, Err(_)) => {
|
||||||
|
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
||||||
|
self.state = ServiceState::Unavailable;
|
||||||
|
self.trigger_on().await;
|
||||||
|
},
|
||||||
|
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url),
|
||||||
|
_ => { /* DEAD END WITH NO INTEREST */ },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// # Fn `service_handler`
|
/// # Fn `service_handler`
|
||||||
/// ## function to realize mechanism of current process' dep services monitoring
|
/// ## function to realize mechanism of current process' dep services monitoring
|
||||||
|
|
@ -19,53 +202,53 @@ use tokio::time::{Duration, Instant};
|
||||||
///
|
///
|
||||||
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
|
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
|
||||||
///
|
///
|
||||||
pub async fn service_handler(
|
// pub async fn service_handler(
|
||||||
name: &str,
|
// name: &str,
|
||||||
services: &Vec<Services>,
|
// services: &Vec<Services>,
|
||||||
tx: Arc<mpsc::Sender<u8>>,
|
// tx: Arc<mpsc::Sender<u8>>,
|
||||||
) -> Result<(), CustomError> {
|
// ) -> Result<(), CustomError> {
|
||||||
// println!("service daemon on {}", name);
|
// // println!("service daemon on {}", name);
|
||||||
for serv in services {
|
// for serv in services {
|
||||||
if check_service(&serv.hostname, &serv.port).await.is_err() {
|
// if check_service(&serv.hostname, &serv.port).await.is_err() {
|
||||||
if !is_active(name).await || is_frozen(name).await {
|
// if !is_active(name).await || is_frozen(name).await {
|
||||||
return Err(CustomError::Fatal);
|
// return Err(CustomError::Fatal);
|
||||||
}
|
// }
|
||||||
error!(
|
// error!(
|
||||||
"Service {}:{} is unreachable for process {}",
|
// "Service {}:{} is unreachable for process {}",
|
||||||
&serv.hostname, &serv.port, &name
|
// &serv.hostname, &serv.port, &name
|
||||||
);
|
// );
|
||||||
match serv.triggers.on_lost.as_str() {
|
// match serv.triggers.on_lost.as_str() {
|
||||||
"stay" => {
|
// "stay" => {
|
||||||
tx.send(4).await.unwrap();
|
// tx.send(4).await.unwrap();
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
"stop" => {
|
// "stop" => {
|
||||||
if looped_service_connecting(name, serv).await.is_err() {
|
// if looped_service_connecting(name, serv).await.is_err() {
|
||||||
tx.send(5).await.unwrap();
|
// tx.send(5).await.unwrap();
|
||||||
tokio::task::yield_now().await;
|
// tokio::task::yield_now().await;
|
||||||
return Err(CustomError::Fatal);
|
// return Err(CustomError::Fatal);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
"hold" => {
|
// "hold" => {
|
||||||
// if is_frozen(name).await {
|
// // if is_frozen(name).await {
|
||||||
// return Err(CustomError::Fatal);
|
// // return Err(CustomError::Fatal);
|
||||||
// }
|
// // }
|
||||||
if looped_service_connecting(name, serv).await.is_err() {
|
// if looped_service_connecting(name, serv).await.is_err() {
|
||||||
tx.send(6).await.unwrap();
|
// tx.send(6).await.unwrap();
|
||||||
tokio::task::yield_now().await;
|
// tokio::task::yield_now().await;
|
||||||
return Err(CustomError::Fatal);
|
// return Err(CustomError::Fatal);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
_ => {
|
// _ => {
|
||||||
tx.send(101).await.unwrap();
|
// tx.send(101).await.unwrap();
|
||||||
return Err(CustomError::Fatal);
|
// return Err(CustomError::Fatal);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
// tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
Ok(())
|
// Ok(())
|
||||||
}
|
// }
|
||||||
|
|
||||||
/// # Fn `looped_service_connecting`
|
/// # Fn `looped_service_connecting`
|
||||||
/// ## for service's state check in loop (with delay and restriction of attempts)
|
/// ## for service's state check in loop (with delay and restriction of attempts)
|
||||||
|
|
@ -80,54 +263,54 @@ pub async fn service_handler(
|
||||||
///
|
///
|
||||||
/// *depends on* : fn `check_service`
|
/// *depends on* : fn `check_service`
|
||||||
///
|
///
|
||||||
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
|
// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
|
||||||
if serv.triggers.wait == 0 {
|
// if serv.triggers.wait == 0 {
|
||||||
loop {
|
// loop {
|
||||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||||
warn!(
|
// warn!(
|
||||||
"Attempting to connect from {} process to {}:{}",
|
// "Attempting to connect from {} process to {}:{}",
|
||||||
&name, &serv.hostname, &serv.port
|
// &name, &serv.hostname, &serv.port
|
||||||
);
|
// );
|
||||||
match check_service(&serv.hostname, &serv.port).await {
|
// match check_service(&serv.hostname, &serv.port).await {
|
||||||
Ok(_) => {
|
// Ok(_) => {
|
||||||
log::info!(
|
// log::info!(
|
||||||
"Successfully connected to {} from {} process!",
|
// "Successfully connected to {} from {} process!",
|
||||||
&serv.hostname,
|
// &serv.hostname,
|
||||||
&name
|
// &name
|
||||||
);
|
// );
|
||||||
break;
|
// break;
|
||||||
}
|
// }
|
||||||
Err(_) => {
|
// Err(_) => {
|
||||||
tokio::task::yield_now().await;
|
// tokio::task::yield_now().await;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
Ok(())
|
// Ok(())
|
||||||
} else {
|
// } else {
|
||||||
let start = Instant::now();
|
// let start = Instant::now();
|
||||||
while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
// while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
||||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||||
warn!(
|
// warn!(
|
||||||
"Attempting to connect from {} process to {}:{}",
|
// "Attempting to connect from {} process to {}:{}",
|
||||||
&name, &serv.hostname, &serv.port
|
// &name, &serv.hostname, &serv.port
|
||||||
);
|
// );
|
||||||
match check_service(&serv.hostname, &serv.port).await {
|
// match check_service(&serv.hostname, &serv.port).await {
|
||||||
Ok(_) => {
|
// Ok(_) => {
|
||||||
log::info!(
|
// log::info!(
|
||||||
"Successfully connected to {} from {} process!",
|
// "Successfully connected to {} from {} process!",
|
||||||
&serv.hostname,
|
// &serv.hostname,
|
||||||
&name
|
// &name
|
||||||
);
|
// );
|
||||||
return Ok(());
|
// return Ok(());
|
||||||
}
|
// }
|
||||||
Err(_) => {
|
// Err(_) => {
|
||||||
tokio::task::yield_now().await;
|
// tokio::task::yield_now().await;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
Err(CustomError::Fatal)
|
// Err(CustomError::Fatal)
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
/// # Fn `check_service`
|
/// # Fn `check_service`
|
||||||
/// ## for check current service's availiability
|
/// ## for check current service's availiability
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue