logic fixes + output opt
parent
a9c0aa14fa
commit
8ba911385f
|
|
@ -9,7 +9,6 @@ use std::sync::Arc;
|
||||||
use std::{env, fs};
|
use std::{env, fs};
|
||||||
use super::preboot::PrebootParams;
|
use super::preboot::PrebootParams;
|
||||||
use tokio::time::{Duration, sleep};
|
use tokio::time::{Duration, sleep};
|
||||||
// use redis::PubSub;
|
|
||||||
use tokio::sync::{
|
use tokio::sync::{
|
||||||
oneshot,
|
oneshot,
|
||||||
oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender },
|
oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender },
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ pub mod bus {
|
||||||
use noxis_cli::{Cli, metrics_models::MetricsMode};
|
use noxis_cli::{Cli, metrics_models::MetricsMode};
|
||||||
use crate::utils::metrics::MetricsExportable;
|
use crate::utils::metrics::MetricsExportable;
|
||||||
|
|
||||||
pub type BusMessageContent = Box<dyn BusContent >;
|
pub type BusMessageContent = Box<dyn BusContent>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum BusMessage {
|
pub enum BusMessage {
|
||||||
|
|
|
||||||
|
|
@ -19,9 +19,11 @@ impl Highway {
|
||||||
Self { to_cli, to_supervisor, to_metrics }
|
Self { to_cli, to_supervisor, to_metrics }
|
||||||
}
|
}
|
||||||
async fn send(&self, msg: BusMessage) -> anyhow::Result<()> {
|
async fn send(&self, msg: BusMessage) -> anyhow::Result<()> {
|
||||||
trace!("redirecting message - {:?} ...", &msg);
|
|
||||||
let dir = match &msg {
|
let dir = match &msg {
|
||||||
BusMessage::Request(dir, ..) | BusMessage::Response(dir, ..) => dir,
|
BusMessage::Request(dir, ..) | BusMessage::Response(dir, ..) => {
|
||||||
|
trace!("redirecting message to {:?} ...", dir);
|
||||||
|
dir
|
||||||
|
},
|
||||||
};
|
};
|
||||||
match dir {
|
match dir {
|
||||||
BusMessageDirection::ToCli => self.send_cli(msg).await,
|
BusMessageDirection::ToCli => self.send_cli(msg).await,
|
||||||
|
|
@ -58,9 +60,8 @@ impl Bus {
|
||||||
impl ProcessUnit for Bus {
|
impl ProcessUnit for Bus {
|
||||||
async fn process(&mut self) {
|
async fn process(&mut self) {
|
||||||
loop {
|
loop {
|
||||||
// TODO
|
|
||||||
while let Ok(content) = self.inner.try_recv() {
|
while let Ok(content) = self.inner.try_recv() {
|
||||||
debug!("new message to the Bus : {:?}", &content);
|
// debug!("new message to the Bus : {:?}", &content);
|
||||||
let msg = match content {
|
let msg = match content {
|
||||||
BusMessage::Request(direction, content_type, content) => {
|
BusMessage::Request(direction, content_type, content) => {
|
||||||
trace!("bus has got a new Request with direction {:?} and type {:?}", &direction, &content_type);
|
trace!("bus has got a new Request with direction {:?} and type {:?}", &direction, &content_type);
|
||||||
|
|
@ -75,7 +76,7 @@ impl ProcessUnit for Bus {
|
||||||
error!("Cannot redirect message : {}", er);
|
error!("Cannot redirect message : {}", er);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -168,18 +168,32 @@ pub mod v2 {
|
||||||
}
|
}
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub async fn start_by_user_call(&mut self) -> anyhow::Result<()> {
|
pub async fn start_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||||
|
if self.negative_events.is_empty() {
|
||||||
let pid = start_process(&self.name, &self.bin).await?;
|
let pid = start_process(&self.name, &self.bin).await?;
|
||||||
warn!("Process {} was started by user call ...", self.name);
|
warn!("Process {} was started by user call ...", self.name);
|
||||||
self.state = ProcessState::Pending;
|
self.state = ProcessState::Pending;
|
||||||
self.pid = Pid(pid);
|
self.pid = Pid(pid);
|
||||||
Ok(())
|
return Ok(())
|
||||||
|
} else {
|
||||||
|
warn!("Attempt to start process {} by user call was stopped due to existance of negative incidents ...", self.name);
|
||||||
|
return Err(anyhow::Error::msg(
|
||||||
|
format!("Attempt to start process {} by user call was stopped due to existance of negative incidents ...", self.name)
|
||||||
|
))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub async fn unfreeze_by_user_call(&mut self) -> anyhow::Result<()> {
|
pub async fn unfreeze_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||||
|
if self.negative_events.is_empty() {
|
||||||
unfreeze_process(&self.name).await?;
|
unfreeze_process(&self.name).await?;
|
||||||
warn!("Process {} was unfrozen by user call ...", self.name);
|
warn!("Process {} was unfrozen by user call ...", self.name);
|
||||||
self.state = ProcessState::Pending;
|
self.state = ProcessState::Pending;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
} else {
|
||||||
|
warn!("Attempt to unfreeze process {} by user call was stopped due to existance of negative incidents ...", self.name);
|
||||||
|
return Err(anyhow::Error::msg(
|
||||||
|
format!("Attempt to unfreeze process {} by user call was stopped due to existance of negative incidents ...", self.name)
|
||||||
|
))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub async fn restart_by_user_call(&mut self) -> anyhow::Result<()> {
|
pub async fn restart_by_user_call(&mut self) -> anyhow::Result<()> {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue