mod options; mod utils; use clap::Parser; use log::{error, info}; use options::config::*; use options::logger::setup_logger; use options::signals::set_valid_destructor; use options::structs::Processes; use options::cli_pipeline::init_cli_pipeline; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use utils::*; #[allow(unused_imports)] use options::preboot::PrebootParams; #[tokio::main(flavor = "multi_thread")] async fn main() { let preboot = PrebootParams::parse().validate(); if let Err(_) = preboot { return; } let _ = setup_logger(); info!("Runner is configurating..."); // setting up redis connection \ // then conf checks to choose the most actual \ let processes: Processes = get_actual_config().await.unwrap_or_else(|| { error!("No actual configuration for runner. Stopping..."); std::process::exit(101); }); info!( "Current runner configuration: {}", &processes.date_of_creation ); info!("Runner is ready. Initializing..."); if processes.processes.is_empty() { error!("Processes list is null, runner-rs initialization is stopped"); return; } let mut handler: Vec> = vec![]; // is in need to send to the signals handler thread let mut senders: Vec>> = vec![]; for proc in processes.processes.iter() { info!( "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", proc.name, proc.path, proc.dependencies.files.len(), proc.dependencies.services.len() ); // creating msg channel // can or should be executed in new thread let (tx, mut rx) = mpsc::channel::(1); let proc = Arc::new(proc.clone()); let tx = Arc::new(tx.clone()); senders.push(Arc::clone(&tx.clone())); let event = tokio::spawn(async move { run_daemons(proc.clone(), tx.clone(), &mut rx).await; }); handler.push(event); } // destructor addition handler.push(tokio::spawn(async move { if set_valid_destructor(Arc::new(senders)).await.is_err() { error!("Linux signals handler creation failed. Terminating main thread..."); return; } tokio::time::sleep(Duration::from_millis(200)).await; info!("End of job. Terminating main thread..."); std::process::exit(0); })); // remote config update subscription handler.push(tokio::spawn(async move { let _ = subscribe_config_stream(Arc::new(processes)).await; })); // cli pipeline handler.push(tokio::spawn(async move { let _ = init_cli_pipeline().await; })); for i in handler { let _ = i.await; } return; } // todo: integration tests // todo: config pulling mechanism rework (socket) // todo: tasks management after killing all processes // todo: