with bus prc manipulation
parent
e9f9bbc0b4
commit
c76b615341
|
|
@ -4,14 +4,16 @@ mod utils;
|
|||
use log::{error, info};
|
||||
use options::logger::setup_logger;
|
||||
use options::signals::set_valid_destructor;
|
||||
use options::structs::Processes;
|
||||
use options::structs::ProcessUnit;
|
||||
use options::structs::{Processes, bus::BusMessage};
|
||||
use options::cli_pipeline::init_cli_pipeline;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use options::preboot::PrebootParams;
|
||||
use tokio::sync::{broadcast, oneshot};
|
||||
use tokio::sync::{broadcast, oneshot, mpsc};
|
||||
use options::config::v2::init_config_mechanism;
|
||||
use utils::v2::init_monitoring;
|
||||
use utils::bus::Bus;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn main() -> anyhow::Result<()>{
|
||||
|
|
@ -27,6 +29,25 @@ async fn main() -> anyhow::Result<()>{
|
|||
let (tx_oneshot, rx_oneshot) = oneshot::channel::<Processes>();
|
||||
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
||||
|
||||
// to BUS channel
|
||||
let (tx_to_bus, mut rx_to_bus) = mpsc::channel::<BusMessage>(5);
|
||||
// from BUS channels
|
||||
let (tx_to_cli, mut rx_to_cli) = mpsc::channel::<BusMessage>(5);
|
||||
let (tx_to_supervisor, mut rx_to_supervisor) = mpsc::channel::<BusMessage>(5);
|
||||
let (tx_to_metrics, mut rx_to_metrics) = mpsc::channel::<BusMessage>(5);
|
||||
|
||||
let tx_to_bus = Arc::new(tx_to_bus);
|
||||
let tx_to_cli = Arc::new(tx_to_cli);
|
||||
let tx_to_supervisor = Arc::new(tx_to_supervisor);
|
||||
let tx_to_metrics = Arc::new(tx_to_metrics);
|
||||
|
||||
let bus_module = tokio::spawn(async move {
|
||||
let mut bus = Bus::new(rx_to_bus, tx_to_cli.clone(), tx_to_supervisor.clone(), tx_to_metrics.clone());
|
||||
bus.process().await;
|
||||
error!("Info Bus crushed !");
|
||||
});
|
||||
handler.push(bus_module);
|
||||
|
||||
// initilaizing task for config manipulations
|
||||
let preboot_config = preboot.clone();
|
||||
let config_module = tokio::spawn(async move {
|
||||
|
|
@ -39,6 +60,7 @@ async fn main() -> anyhow::Result<()>{
|
|||
handler.push(config_module);
|
||||
|
||||
// initilaizing task for cli manipulation
|
||||
let tx_bus = tx_to_bus.clone();
|
||||
let preboot_cli = preboot.clone();
|
||||
let cli_module = tokio::spawn(async move {
|
||||
let config = {
|
||||
|
|
@ -54,7 +76,9 @@ async fn main() -> anyhow::Result<()>{
|
|||
if let Err(er) = init_cli_pipeline(
|
||||
preboot_cli,
|
||||
Arc::new(config),
|
||||
tx_oneshot
|
||||
tx_oneshot,
|
||||
rx_to_cli,
|
||||
tx_bus.clone()
|
||||
).await {
|
||||
error!("CLI pipeline failed due to {}", er)
|
||||
}
|
||||
|
|
@ -70,6 +94,7 @@ async fn main() -> anyhow::Result<()>{
|
|||
});
|
||||
handler.push(ctrlc);
|
||||
|
||||
let tx_bus = tx_to_bus.clone();
|
||||
let monitoring = tokio::spawn(async move {
|
||||
let config = {
|
||||
let mut tick = tokio::time::interval(Duration::from_millis(500));
|
||||
|
|
@ -81,7 +106,11 @@ async fn main() -> anyhow::Result<()>{
|
|||
}
|
||||
}
|
||||
};
|
||||
if let Err(er) = init_monitoring(config).await {
|
||||
if let Err(er) = init_monitoring(
|
||||
config,
|
||||
rx_to_supervisor,
|
||||
tx_bus
|
||||
).await {
|
||||
error!("Monitoring mod failed due to {}", er);
|
||||
}
|
||||
});
|
||||
|
|
@ -90,80 +119,5 @@ async fn main() -> anyhow::Result<()>{
|
|||
for i in handler {
|
||||
let _ = i.await;
|
||||
}
|
||||
|
||||
// setting up redis connection \
|
||||
// then conf checks to choose the most actual \
|
||||
// let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
|
||||
// error!("No actual configuration for runner. Stopping...");
|
||||
// std::process::exit(1);
|
||||
// });
|
||||
//
|
||||
// info!(
|
||||
// "Current runner configuration: {}",
|
||||
// &processes.date_of_creation
|
||||
// );
|
||||
// info!("Runner is ready. Initializing...");
|
||||
//
|
||||
// if processes.processes.is_empty() {
|
||||
// error!("Processes list is null, runner-rs initialization is stopped");
|
||||
// return Err(Error::msg("Empty processes segment in config"));
|
||||
// }
|
||||
// let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
||||
// // is in need to send to the signals handler thread
|
||||
// let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
|
||||
//
|
||||
// for proc in processes.processes.iter() {
|
||||
// info!(
|
||||
// "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
|
||||
// proc.name,
|
||||
// proc.path,
|
||||
// proc.dependencies.files.len(),
|
||||
// proc.dependencies.services.len()
|
||||
// );
|
||||
//
|
||||
// // creating msg channel
|
||||
// // can or should be executed in new thread
|
||||
// let (tx, mut rx) = mpsc::channel::<u8>(1);
|
||||
// let proc = Arc::new(proc.clone());
|
||||
// let tx = Arc::new(tx.clone());
|
||||
//
|
||||
// senders.push(Arc::clone(&tx.clone()));
|
||||
//
|
||||
// let event = tokio::spawn(async move {
|
||||
// run_daemons(proc.clone(), tx.clone(), &mut rx).await;
|
||||
// });
|
||||
// handler.push(event);
|
||||
// }
|
||||
//
|
||||
// // destructor addition
|
||||
// handler.push(tokio::spawn(async move {
|
||||
// if set_valid_destructor(Arc::new(senders)).await.is_err() {
|
||||
// error!("Linux signals handler creation failed. Terminating main thread...");
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
// info!("End of job. Terminating main thread...");
|
||||
// std::process::exit(0);
|
||||
// }));
|
||||
//
|
||||
// // remote config update subscription
|
||||
// handler.push(tokio::spawn(async move {
|
||||
// let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
|
||||
// }));
|
||||
//
|
||||
// // cli pipeline
|
||||
// handler.push(tokio::spawn(async move {
|
||||
// let _ = init_cli_pipeline().await;
|
||||
// }));
|
||||
//
|
||||
// for i in handler {
|
||||
// let _ = i.await;
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// todo: integration tests
|
||||
// todo: config pulling mechanism rework (socket)
|
||||
// todo: tasks management after killing all processes
|
||||
// todo:
|
||||
}
|
||||
|
|
@ -2,16 +2,23 @@ use log::{error, info};
|
|||
use tokio::net::{ UnixStream, UnixListener };
|
||||
use tokio::sync::{Mutex, OnceCell};
|
||||
use tokio::time::{sleep, Duration};
|
||||
use std::any::Any;
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{ AsyncWriteExt, AsyncReadExt};
|
||||
use noxis_cli::Cli;
|
||||
use noxis_cli::{Cli, ProcessAction};
|
||||
use crate::options::structs::bus::InternalCli;
|
||||
|
||||
use super::structs::Processes;
|
||||
use super::structs::bus::BusMessage;
|
||||
|
||||
use super::preboot::PrebootParams;
|
||||
|
||||
type ConfigGateway = tokio::sync::oneshot::Sender<Processes>;
|
||||
type ProcessedConfigGateway = Arc<Mutex<OnceCell<ConfigGateway>>>;
|
||||
type BusReciever = tokio::sync::mpsc::Receiver<BusMessage>;
|
||||
type BusSender = Arc<tokio::sync::mpsc::Sender<BusMessage>>;
|
||||
type ReadyBusReciever = Arc<Mutex<tokio::sync::mpsc::Receiver<BusMessage>>>;
|
||||
|
||||
/// # Fn `init_cli_pipeline`
|
||||
/// ## for catching all input requests from CLI
|
||||
|
|
@ -30,6 +37,8 @@ pub async fn init_cli_pipeline(
|
|||
params: Arc<PrebootParams>,
|
||||
config : Arc<Processes>,
|
||||
config_gateway : ConfigGateway,
|
||||
bus_reciever : BusReciever,
|
||||
bus_sender : BusSender,
|
||||
) -> anyhow::Result<()> {
|
||||
let socket_path = ¶ms.self_socket;
|
||||
let _ = fs::remove_file(socket_path);
|
||||
|
|
@ -39,6 +48,7 @@ pub async fn init_cli_pipeline(
|
|||
OnceCell::new_with(Some(config_gateway))
|
||||
)
|
||||
);
|
||||
let bus_reciever = Arc::new(Mutex::new(bus_reciever));
|
||||
|
||||
match UnixListener::bind(socket_path) {
|
||||
Ok(list) => {
|
||||
|
|
@ -52,8 +62,10 @@ pub async fn init_cli_pipeline(
|
|||
let params = params.clone();
|
||||
let config = config.clone();
|
||||
let config_gateway = config_gateway.clone();
|
||||
let bus_reciever = bus_reciever.clone();
|
||||
let bus_sender = bus_sender.clone();
|
||||
tokio::spawn(async move {
|
||||
process_connection(socket, params.clone(), config.clone(), config_gateway.clone()).await;
|
||||
process_connection(socket, params.clone(), config.clone(), config_gateway.clone(), bus_reciever, bus_sender).await;
|
||||
});
|
||||
},
|
||||
Err(er) => {
|
||||
|
|
@ -84,7 +96,14 @@ pub async fn init_cli_pipeline(
|
|||
///
|
||||
/// *depends on* : `tokio::net::TcpStream`
|
||||
///
|
||||
async fn process_connection(mut stream: UnixStream, params: Arc<PrebootParams>, config : Arc<Processes>, cfg_gateway : ProcessedConfigGateway) {
|
||||
async fn process_connection(
|
||||
mut stream: UnixStream,
|
||||
params: Arc<PrebootParams>,
|
||||
config : Arc<Processes>,
|
||||
cfg_gateway : ProcessedConfigGateway,
|
||||
bus_reciever : ReadyBusReciever,
|
||||
bus_sender : BusSender,
|
||||
) {
|
||||
let mut buf = vec![0; 1024];
|
||||
match stream.read(&mut buf).await {
|
||||
Ok(0) => {
|
||||
|
|
@ -96,7 +115,7 @@ async fn process_connection(mut stream: UnixStream, params: Arc<PrebootParams>,
|
|||
match serde_json::from_slice::<Cli>(&buf) {
|
||||
Ok(cli) => {
|
||||
info!("Received CLI request: {:?}", cli);
|
||||
let response = match process_cli_cmd(cli, params.clone(), config, cfg_gateway.clone()).await {
|
||||
let response = match process_cli_cmd(cli, params.clone(), config, cfg_gateway.clone(), bus_reciever.clone(), bus_sender.clone()).await {
|
||||
Ok(response) => {
|
||||
response
|
||||
},
|
||||
|
|
@ -121,7 +140,14 @@ async fn process_connection(mut stream: UnixStream, params: Arc<PrebootParams>,
|
|||
}
|
||||
|
||||
|
||||
async fn process_cli_cmd(cli : Cli, params: Arc<PrebootParams>, global_config : Arc<Processes>, cfg_gateway: ProcessedConfigGateway) -> anyhow::Result<String> {
|
||||
async fn process_cli_cmd(
|
||||
cli : Cli,
|
||||
params: Arc<PrebootParams>,
|
||||
global_config : Arc<Processes>,
|
||||
cfg_gateway: ProcessedConfigGateway,
|
||||
bus_reciever : ReadyBusReciever,
|
||||
bus_sender : BusSender,
|
||||
) -> anyhow::Result<String> {
|
||||
use noxis_cli::{Commands, ConfigAction};
|
||||
return match cli.command {
|
||||
Commands::Config(config) => {
|
||||
|
|
@ -170,6 +196,78 @@ async fn process_cli_cmd(cli : Cli, params: Arc<PrebootParams>, global_config :
|
|||
// _ => Err(anyhow::Error::msg("Unrecognized command from CLI"))
|
||||
}
|
||||
},
|
||||
Commands::Process(prc) => {
|
||||
use crate::options::structs::bus::{BusMessageDirection, BusMessageContentType, CLiCommand};
|
||||
|
||||
let proc_name = prc.process;
|
||||
let req = BusMessage::Request(
|
||||
BusMessageDirection::ToSupervisor,
|
||||
BusMessageContentType::Cli,
|
||||
Box::new(
|
||||
match prc.action {
|
||||
ProcessAction::Start => {
|
||||
InternalCli {
|
||||
prc : proc_name,
|
||||
cmd : CLiCommand::Start,
|
||||
}
|
||||
},
|
||||
ProcessAction::Stop => {
|
||||
InternalCli {
|
||||
prc : proc_name,
|
||||
cmd : CLiCommand::Stop,
|
||||
}
|
||||
},
|
||||
ProcessAction::Restart => {
|
||||
InternalCli {
|
||||
prc : proc_name,
|
||||
cmd : CLiCommand::Restart,
|
||||
}
|
||||
},
|
||||
ProcessAction::Freeze => {
|
||||
InternalCli {
|
||||
prc : proc_name,
|
||||
cmd : CLiCommand::Freeze,
|
||||
}
|
||||
},
|
||||
ProcessAction::Unfreeze => {
|
||||
InternalCli {
|
||||
prc : proc_name,
|
||||
cmd : CLiCommand::Unfreeze,
|
||||
}
|
||||
},
|
||||
/* TODO: ALL CMDS */
|
||||
_ => {
|
||||
InternalCli {
|
||||
prc : proc_name,
|
||||
cmd : CLiCommand::Restart,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
);
|
||||
let mut bus = bus_reciever.lock().await;
|
||||
bus_sender.send(req).await?;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
let resp = tokio::time::timeout(std::time::Duration::from_secs(10), async move {
|
||||
loop {
|
||||
if let Ok(cont) = bus.try_recv() {
|
||||
return cont
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||
}
|
||||
}).await?;
|
||||
|
||||
if let BusMessage::Response(_, _, content) = resp {
|
||||
let content: Box<dyn Any> = content;
|
||||
if let Ok(resp) = content.downcast::<String>() {
|
||||
return Ok(*resp)
|
||||
} else {
|
||||
// TODO : REWRITE THIS
|
||||
return Ok(String::from("OK"));
|
||||
}
|
||||
}
|
||||
Ok(String::from("OK"))
|
||||
},
|
||||
/* */
|
||||
Commands::Status => Ok(String::from("Ok")),
|
||||
_ => Ok(String::from("Ok"))
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use std::net::Ipv4Addr;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use async_trait::async_trait;
|
||||
use std::sync::Arc;
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
pub mod bus {
|
||||
use std::fmt::Debug;
|
||||
|
|
@ -35,7 +35,22 @@ pub mod bus {
|
|||
MetricsObj,
|
||||
}
|
||||
|
||||
pub trait BusContent: Send + Sync + 'static + Debug {
|
||||
#[derive(Debug)]
|
||||
pub enum CLiCommand {
|
||||
Start,
|
||||
Stop,
|
||||
Restart,
|
||||
Freeze,
|
||||
Unfreeze
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InternalCli {
|
||||
pub prc : String,
|
||||
pub cmd : CLiCommand,
|
||||
}
|
||||
|
||||
pub trait BusContent: Send + Sync + 'static + Debug + Any {
|
||||
fn get_bus_type(&self) -> BusMessageContentType;
|
||||
}
|
||||
|
||||
|
|
@ -49,11 +64,16 @@ pub mod bus {
|
|||
BusMessageContentType::Cli
|
||||
}
|
||||
}
|
||||
impl BusContent for dyn MetricsExportable {
|
||||
impl BusContent for InternalCli {
|
||||
fn get_bus_type(&self) -> BusMessageContentType {
|
||||
BusMessageContentType::Cli
|
||||
}
|
||||
}
|
||||
impl BusContent for dyn MetricsExportable {
|
||||
fn get_bus_type(&self) -> BusMessageContentType {
|
||||
BusMessageContentType::MetricsObj
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ use files::v2::FilesController;
|
|||
use services::v2::ServicesController;
|
||||
use async_trait::async_trait;
|
||||
use lazy_static::lazy_static;
|
||||
use crate::options::structs::bus::{BusMessage, InternalCli, BusMessageContentType};
|
||||
|
||||
lazy_static! {
|
||||
static ref GET_ID_CMD : &'static str = "hostname";
|
||||
|
|
@ -25,8 +26,12 @@ lazy_static! {
|
|||
|
||||
pub mod v2 {
|
||||
use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
|
||||
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
|
||||
use crate::options::structs::{bus::CLiCommand, Events, FileTriggersForController, ProcessUnit, Triggers};
|
||||
use super::*;
|
||||
use std::any::Any;
|
||||
|
||||
type BusReciever = tokio::sync::mpsc::Receiver<BusMessage>;
|
||||
type BusSender = Arc<tokio::sync::mpsc::Sender<BusMessage>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ControllerResult {
|
||||
|
|
@ -41,11 +46,12 @@ pub mod v2 {
|
|||
files : LinkedList<FilesController>,
|
||||
services : LinkedList<ServicesController>,
|
||||
config : Arc<Processes>,
|
||||
bus : (BusReciever, BusSender),
|
||||
}
|
||||
|
||||
impl Supervisor {
|
||||
pub fn new() -> Supervisor {
|
||||
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new(), config: Arc::new(Processes::default()) }
|
||||
pub fn new(bus_reciever : BusReciever, bus_sender: BusSender) -> Supervisor {
|
||||
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new(), config: Arc::new(Processes::default()), bus : (bus_reciever, bus_sender) }
|
||||
}
|
||||
pub async fn with_config(mut self, config: Processes) -> Supervisor {
|
||||
self.config = Arc::from(config);
|
||||
|
|
@ -123,12 +129,61 @@ pub mod v2 {
|
|||
info!("Initializing monitoring ...");
|
||||
loop {
|
||||
//
|
||||
// todo: CHANNEL check and reaction
|
||||
//
|
||||
// dbg!(&self);
|
||||
let rec = &mut self.bus.0;
|
||||
while let Ok(request) = rec.try_recv(){
|
||||
if let BusMessage::Request(_, _, cont) = request {
|
||||
let cont: Box<dyn Any + Send> = cont;
|
||||
if let Ok(cli) = cont.downcast::<InternalCli>() {
|
||||
let mut count = 0;
|
||||
let fut = (&mut self.prcs).into_iter()
|
||||
.find(|prc| prc.name == Arc::from(cli.prc.as_ref()))
|
||||
.map(|prc| async {
|
||||
let count = &mut count;
|
||||
*count += 1;
|
||||
let res = match cli.cmd {
|
||||
CLiCommand::Start => {
|
||||
prc.start_by_user_call().await
|
||||
},
|
||||
CLiCommand::Stop => {
|
||||
prc.stop_by_user_call().await
|
||||
},
|
||||
CLiCommand::Restart => {
|
||||
prc.restart_by_user_call().await
|
||||
},
|
||||
CLiCommand::Freeze => {
|
||||
prc.freeze_by_user_call().await
|
||||
},
|
||||
CLiCommand::Unfreeze => {
|
||||
prc.unfreeze_by_user_call().await
|
||||
},
|
||||
};
|
||||
let sender = self.bus.1.clone();
|
||||
let resp_content = match res {
|
||||
Ok(_) => format!("Ok on user call abour process {}", prc.name),
|
||||
Err(er) => format!("Error: User call for process {} failed : {}", prc.name, er),
|
||||
};
|
||||
let _ = sender.send(BusMessage::Response(
|
||||
crate::options::structs::bus::BusMessageDirection::ToCli,
|
||||
BusMessageContentType::RawString,
|
||||
Box::new(resp_content)
|
||||
)).await;
|
||||
1
|
||||
});
|
||||
if let Some(fut) = fut {
|
||||
fut.await;
|
||||
}
|
||||
else {
|
||||
let _ = self.bus.1.clone().send(BusMessage::Response(
|
||||
crate::options::structs::bus::BusMessageDirection::ToCli,
|
||||
BusMessageContentType::RawString,
|
||||
Box::new(format!("No process named `{}` was found in controlled scope", cli.prc))
|
||||
)).await;
|
||||
}
|
||||
}
|
||||
// TODO: GET PRCS METRICS DOWNCASTING
|
||||
}
|
||||
}
|
||||
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
|
||||
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
|
||||
// let res = tokio::join!(prc.process(), file.process(), serv.process());
|
||||
if let Some(mut val) = self.prcs.pop_front() {
|
||||
tasks.push(
|
||||
tokio::spawn( async move {
|
||||
|
|
@ -167,15 +222,12 @@ pub mod v2 {
|
|||
}
|
||||
}
|
||||
|
||||
// spawn tasks
|
||||
// spawn prc
|
||||
// spawn files
|
||||
// spawn services
|
||||
// ## for ... i.await in loop
|
||||
pub async fn init_monitoring(
|
||||
config: Processes
|
||||
config: Processes,
|
||||
bus_reciever : BusReciever,
|
||||
bus_sender : BusSender,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut supervisor = Supervisor::new().with_config(config).await;
|
||||
let mut supervisor = Supervisor::new(bus_reciever, bus_sender).with_config(config).await;
|
||||
info!("Monitoring: {} ", &supervisor.get_stats());
|
||||
supervisor.process().await;
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ impl Highway {
|
|||
Self { to_cli, to_supervisor, to_metrics }
|
||||
}
|
||||
async fn send(&self, msg: BusMessage) -> anyhow::Result<()> {
|
||||
trace!("redirecting message - {:?} ...", &msg);
|
||||
let dir = match &msg {
|
||||
BusMessage::Request(dir, ..) | BusMessage::Response(dir, ..) => dir,
|
||||
};
|
||||
|
|
@ -48,7 +49,7 @@ pub struct Bus {
|
|||
}
|
||||
|
||||
impl Bus {
|
||||
fn new(inner: Inner, to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self {
|
||||
pub fn new(inner: Inner, to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self {
|
||||
Self { inner, highway: Highway::new(to_cli, to_supervisor, to_metrics) }
|
||||
}
|
||||
}
|
||||
|
|
@ -59,14 +60,14 @@ impl ProcessUnit for Bus {
|
|||
loop {
|
||||
// TODO
|
||||
while let Ok(content) = self.inner.try_recv() {
|
||||
debug!("New message to the Bus : {:?}", &content);
|
||||
debug!("new message to the Bus : {:?}", &content);
|
||||
let msg = match content {
|
||||
BusMessage::Request(direction, content_type, content) => {
|
||||
trace!("Bus has got a new Request with direction {:?} and type {:?}", &direction, &content_type);
|
||||
BusMessage::Response(direction, content_type, content)
|
||||
trace!("bus has got a new Request with direction {:?} and type {:?}", &direction, &content_type);
|
||||
BusMessage::Request(direction, content_type, content)
|
||||
},
|
||||
BusMessage::Response(direction, content_type, content) => {
|
||||
trace!("Bus has got a new Response with direction {:?} and type {:?}", &direction, &content_type);
|
||||
trace!("bus has got a new Response with direction {:?} and type {:?}", &direction, &content_type);
|
||||
BusMessage::Response(direction, content_type, content)
|
||||
},
|
||||
};
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ use crate::options::structs::Dependencies;
|
|||
use serde::Serialize;
|
||||
use super::prcs::v2::Pid;
|
||||
use std::fmt::Debug;
|
||||
use crate::options::structs::bus::BusMessage;
|
||||
// use pcap::{Device, Capture, Active};
|
||||
// use std::net::Ipv4Addr;
|
||||
// use anyhow::{Result, Ok};
|
||||
|
|
@ -19,6 +20,134 @@ type CoreUsage = BTreeMap<usize, CoreInfo>;
|
|||
type Disks = Vec<Disk>;
|
||||
type Ifaces = Vec<Network>;
|
||||
pub type MetricProcesses = Vec<ProcessExtended>;
|
||||
type BusReciever = tokio::sync::mpsc::Receiver<BusMessage>;
|
||||
type BusSender = Arc<tokio::sync::mpsc::Sender<BusMessage>>;
|
||||
|
||||
/// # Fn `init_metrics_grubber`
|
||||
/// ## for initializing process of unstoppable grubbing metrics.
|
||||
///
|
||||
/// *input* : `Arc<Mutex<UnixSocket>>` ??
|
||||
///
|
||||
/// *output* : `Err` if it cant create grubbers | `Ok` on finish
|
||||
///
|
||||
/// *initiator* : main thread ??
|
||||
///
|
||||
/// *managing* : object of unix-socket reader
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
#[allow(dead_code)]
|
||||
pub async fn init_metrics_grubber(
|
||||
/* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */
|
||||
) {
|
||||
let mut system = System::new();
|
||||
get_all_metrics(&mut system).await;
|
||||
}
|
||||
|
||||
async fn get_all_metrics(system: &mut System) {
|
||||
system.refresh_all();
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||
dbg!(get_host_info().await);
|
||||
dbg!(get_cpu_metrics(system).await);
|
||||
dbg!(get_ram_metrics(system).await);
|
||||
dbg!(get_all_disks_metrics().await);
|
||||
dbg!(get_all_ifaces_metrics().await);
|
||||
}
|
||||
|
||||
async fn get_host_info() -> HostInfo {
|
||||
HostInfo {
|
||||
hostname : System::host_name().unwrap_or_default(),
|
||||
os : System::long_os_version().unwrap_or_default(),
|
||||
kernel : System::kernel_version().unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_cpu_metrics(system: &mut System) -> Cpu {
|
||||
system.refresh_cpu_all();
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||
|
||||
let mut buffer = CoreUsage::new();
|
||||
let global_usage = system.global_cpu_usage();
|
||||
|
||||
system.cpus()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.for_each(|(id, cpu)| {
|
||||
let core_info = CoreInfo {
|
||||
// id,
|
||||
brand : cpu.brand().to_string(),
|
||||
name : cpu.name().to_string(),
|
||||
frequency : cpu.frequency(),
|
||||
vendor_id : cpu.vendor_id().to_string(),
|
||||
usage : cpu.cpu_usage(),
|
||||
};
|
||||
// buffer.push(core_info);
|
||||
buffer.entry(id).or_insert(core_info);
|
||||
});
|
||||
|
||||
Cpu {
|
||||
global_usage,
|
||||
usage: buffer
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_ram_metrics(system: &mut System) -> Ram {
|
||||
system.refresh_memory();
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||
|
||||
Ram {
|
||||
free_mem : system.free_memory(),
|
||||
free_swap : system.free_swap(),
|
||||
total_mem : system.total_memory(),
|
||||
total_swap : system.total_swap(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_all_disks_metrics() -> Disks {
|
||||
let disks = DisksList::new_with_refreshed_list();
|
||||
let mut buffer = Disks::new();
|
||||
disks.list()
|
||||
.iter()
|
||||
.for_each(|disk| {
|
||||
let disk = Disk {
|
||||
name : disk.name().to_string_lossy().into_owned(),
|
||||
kind: disk.kind().to_string(),
|
||||
fs : disk.file_system().to_string_lossy().into_owned(),
|
||||
mount_point : disk.mount_point().to_string_lossy().into_owned(),
|
||||
total_space : disk.total_space(),
|
||||
available_space : disk.available_space(),
|
||||
is_removable : disk.is_removable(),
|
||||
is_readonly : disk.is_read_only()
|
||||
};
|
||||
buffer.push(disk);
|
||||
});
|
||||
buffer
|
||||
}
|
||||
|
||||
async fn get_all_ifaces_metrics() -> Ifaces {
|
||||
let mut ifaces = Ifaces::new();
|
||||
let networks = Networks::new_with_refreshed_list();
|
||||
networks.iter()
|
||||
.for_each(|(iface_name, data)| {
|
||||
let mac = data.mac_address().to_string();
|
||||
let iface = Network {
|
||||
iname : iface_name.to_owned(),
|
||||
mac : mac,
|
||||
recieved : data.received(),
|
||||
transmitted : data.transmitted(),
|
||||
total_recieved_bytes : data.total_received(),
|
||||
total_transmitted_bytes : data.total_transmitted(),
|
||||
total_recieved_packets : data.total_packets_received(),
|
||||
total_transmitted_packets : data.total_packets_transmitted(),
|
||||
errors_on_recieved : data.errors_on_received(),
|
||||
errors_on_transmitted : data.errors_on_transmitted(),
|
||||
};
|
||||
ifaces.push(iface);
|
||||
});
|
||||
ifaces
|
||||
}
|
||||
|
||||
async fn get_all_processes_metrics(system: &mut System) {}
|
||||
|
||||
pub enum MetricType {
|
||||
FullMetrics,
|
||||
|
|
@ -190,132 +319,6 @@ impl MetricsExportable for MetricProcesses {
|
|||
}
|
||||
}
|
||||
|
||||
/// # Fn `init_metrics_grubber`
|
||||
/// ## for initializing process of unstoppable grubbing metrics.
|
||||
///
|
||||
/// *input* : `Arc<Mutex<UnixSocket>>` ??
|
||||
///
|
||||
/// *output* : `Err` if it cant create grubbers | `Ok` on finish
|
||||
///
|
||||
/// *initiator* : main thread ??
|
||||
///
|
||||
/// *managing* : object of unix-socket reader
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
#[allow(dead_code)]
|
||||
pub async fn init_metrics_grubber(
|
||||
/* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */
|
||||
) {
|
||||
let mut system = System::new();
|
||||
get_all_metrics(&mut system).await;
|
||||
}
|
||||
|
||||
async fn get_all_metrics(system: &mut System) {
|
||||
system.refresh_all();
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||
dbg!(get_host_info().await);
|
||||
dbg!(get_cpu_metrics(system).await);
|
||||
dbg!(get_ram_metrics(system).await);
|
||||
dbg!(get_all_disks_metrics().await);
|
||||
dbg!(get_all_ifaces_metrics().await);
|
||||
}
|
||||
|
||||
async fn get_host_info() -> HostInfo {
|
||||
HostInfo {
|
||||
hostname : System::host_name().unwrap_or_default(),
|
||||
os : System::long_os_version().unwrap_or_default(),
|
||||
kernel : System::kernel_version().unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_cpu_metrics(system: &mut System) -> Cpu {
|
||||
system.refresh_cpu_all();
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||
|
||||
let mut buffer = CoreUsage::new();
|
||||
let global_usage = system.global_cpu_usage();
|
||||
|
||||
system.cpus()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.for_each(|(id, cpu)| {
|
||||
let core_info = CoreInfo {
|
||||
// id,
|
||||
brand : cpu.brand().to_string(),
|
||||
name : cpu.name().to_string(),
|
||||
frequency : cpu.frequency(),
|
||||
vendor_id : cpu.vendor_id().to_string(),
|
||||
usage : cpu.cpu_usage(),
|
||||
};
|
||||
// buffer.push(core_info);
|
||||
buffer.entry(id).or_insert(core_info);
|
||||
});
|
||||
|
||||
Cpu {
|
||||
global_usage,
|
||||
usage: buffer
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_ram_metrics(system: &mut System) -> Ram {
|
||||
system.refresh_memory();
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||
|
||||
Ram {
|
||||
free_mem : system.free_memory(),
|
||||
free_swap : system.free_swap(),
|
||||
total_mem : system.total_memory(),
|
||||
total_swap : system.total_swap(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_all_disks_metrics() -> Disks {
|
||||
let disks = DisksList::new_with_refreshed_list();
|
||||
let mut buffer = Disks::new();
|
||||
disks.list()
|
||||
.iter()
|
||||
.for_each(|disk| {
|
||||
let disk = Disk {
|
||||
name : disk.name().to_string_lossy().into_owned(),
|
||||
kind: disk.kind().to_string(),
|
||||
fs : disk.file_system().to_string_lossy().into_owned(),
|
||||
mount_point : disk.mount_point().to_string_lossy().into_owned(),
|
||||
total_space : disk.total_space(),
|
||||
available_space : disk.available_space(),
|
||||
is_removable : disk.is_removable(),
|
||||
is_readonly : disk.is_read_only()
|
||||
};
|
||||
buffer.push(disk);
|
||||
});
|
||||
buffer
|
||||
}
|
||||
|
||||
async fn get_all_ifaces_metrics() -> Ifaces {
|
||||
let mut ifaces = Ifaces::new();
|
||||
let networks = Networks::new_with_refreshed_list();
|
||||
networks.iter()
|
||||
.for_each(|(iface_name, data)| {
|
||||
let mac = data.mac_address().to_string();
|
||||
let iface = Network {
|
||||
iname : iface_name.to_owned(),
|
||||
mac : mac,
|
||||
recieved : data.received(),
|
||||
transmitted : data.transmitted(),
|
||||
total_recieved_bytes : data.total_received(),
|
||||
total_transmitted_bytes : data.total_transmitted(),
|
||||
total_recieved_packets : data.total_packets_received(),
|
||||
total_transmitted_packets : data.total_packets_transmitted(),
|
||||
errors_on_recieved : data.errors_on_received(),
|
||||
errors_on_transmitted : data.errors_on_transmitted(),
|
||||
};
|
||||
ifaces.push(iface);
|
||||
});
|
||||
ifaces
|
||||
}
|
||||
|
||||
async fn get_all_processes_metrics(system: &mut System) {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod metrics_unittets {
|
||||
#[tokio::test]
|
||||
|
|
|
|||
|
|
@ -154,6 +154,7 @@ pub mod v2 {
|
|||
#[allow(unused)]
|
||||
pub async fn stop_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||
terminate_process(&self.name).await?;
|
||||
warn!("Process {} was stopped by user call ...", self.name);
|
||||
self.state = ProcessState::StoppedByCli;
|
||||
self.pid = Pid::new();
|
||||
Ok(())
|
||||
|
|
@ -161,12 +162,14 @@ pub mod v2 {
|
|||
#[allow(unused)]
|
||||
pub async fn freeze_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||
freeze_process(&self.name).await?;
|
||||
warn!("Process {} was frozen by user call ...", self.name);
|
||||
self.state = ProcessState::HoldingByCli;
|
||||
Ok(())
|
||||
}
|
||||
#[allow(unused)]
|
||||
pub async fn start_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||
let pid = start_process(&self.name, &self.bin).await?;
|
||||
warn!("Process {} was started by user call ...", self.name);
|
||||
self.state = ProcessState::Pending;
|
||||
self.pid = Pid(pid);
|
||||
Ok(())
|
||||
|
|
@ -174,12 +177,14 @@ pub mod v2 {
|
|||
#[allow(unused)]
|
||||
pub async fn unfreeze_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||
unfreeze_process(&self.name).await?;
|
||||
warn!("Process {} was unfrozen by user call ...", self.name);
|
||||
self.state = ProcessState::Pending;
|
||||
Ok(())
|
||||
}
|
||||
#[allow(unused)]
|
||||
pub async fn restart_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||
let pid = restart_process(&self.name, &self.bin).await?;
|
||||
warn!("Process {} was restarted by user call ...", self.name);
|
||||
self.pid = Pid(pid);
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue