monitor/noxis-rs/src/main.rs

157 lines
4.9 KiB
Rust

mod options;
mod utils;
use clap::Parser;
use log::{error, info};
use options::config::*;
use options::logger::setup_logger;
use options::signals::set_valid_destructor;
use options::structs::Processes;
use options::cli_pipeline::init_cli_pipeline;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use utils::*;
use options::preboot::PrebootParams;
use tokio::sync::{broadcast, oneshot};
use options::config::v2::init_config_mechanism;
use utils::v2::init_monitoring;
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()>{
let preboot = Arc::new(PrebootParams::parse().validate()?);
let _ = setup_logger();
info!("Noxis is configurating...");
//
let (tx_brd, mut rx_brd) = broadcast::channel::<Processes>(1);
// cli <-> config
let (tx_oneshot, rx_oneshot) = oneshot::channel::<Processes>();
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// initilaizing task for config manipulations
let config_module = tokio::spawn(async move {
let _ = init_config_mechanism(
rx_oneshot,
tx_brd,
preboot.clone()
).await;
});
handler.push(config_module);
// initilaizing task for cli manipulation
let cli_module = tokio::spawn(async move {
if let Err(er) = init_cli_pipeline().await {
error!("CLI pipeline failed due to {}", er)
}
});
handler.push(cli_module);
// initilaizing task for deinitializing `Noxis`
let ctrlc = tokio::spawn(async move {
if let Err(er) = set_valid_destructor(vec![].into()).await {
error!("Destructor mod failed due to {}", er);
}
std::process::exit(0);
});
handler.push(ctrlc);
let monitoring = tokio::spawn(async move {
let config = {
let mut tick = tokio::time::interval(Duration::from_millis(500));
loop {
tick.tick().await;
break match rx_brd.try_recv() {
Ok(conf) => conf,
Err(_) => continue,
}
}
};
if let Err(er) = init_monitoring(config).await {
error!("Monitoring mod failed due to {}", er);
}
});
handler.push(monitoring);
for i in handler {
let _ = i.await;
}
// setting up redis connection \
// then conf checks to choose the most actual \
// let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
// error!("No actual configuration for runner. Stopping...");
// std::process::exit(1);
// });
//
// info!(
// "Current runner configuration: {}",
// &processes.date_of_creation
// );
// info!("Runner is ready. Initializing...");
//
// if processes.processes.is_empty() {
// error!("Processes list is null, runner-rs initialization is stopped");
// return Err(Error::msg("Empty processes segment in config"));
// }
// let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// // is in need to send to the signals handler thread
// let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
//
// for proc in processes.processes.iter() {
// info!(
// "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
// proc.name,
// proc.path,
// proc.dependencies.files.len(),
// proc.dependencies.services.len()
// );
//
// // creating msg channel
// // can or should be executed in new thread
// let (tx, mut rx) = mpsc::channel::<u8>(1);
// let proc = Arc::new(proc.clone());
// let tx = Arc::new(tx.clone());
//
// senders.push(Arc::clone(&tx.clone()));
//
// let event = tokio::spawn(async move {
// run_daemons(proc.clone(), tx.clone(), &mut rx).await;
// });
// handler.push(event);
// }
//
// // destructor addition
// handler.push(tokio::spawn(async move {
// if set_valid_destructor(Arc::new(senders)).await.is_err() {
// error!("Linux signals handler creation failed. Terminating main thread...");
// return;
// }
//
// tokio::time::sleep(Duration::from_millis(200)).await;
// info!("End of job. Terminating main thread...");
// std::process::exit(0);
// }));
//
// // remote config update subscription
// handler.push(tokio::spawn(async move {
// let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
// }));
//
// // cli pipeline
// handler.push(tokio::spawn(async move {
// let _ = init_cli_pipeline().await;
// }));
//
// for i in handler {
// let _ = i.await;
// }
Ok(())
}
// todo: integration tests
// todo: config pulling mechanism rework (socket)
// todo: tasks management after killing all processes
// todo: