Compare commits
2 Commits
281841b68a
...
403285a937
| Author | SHA1 | Date |
|---|---|---|
|
|
403285a937 | |
|
|
a16eb78b79 |
|
|
@ -58,9 +58,7 @@ async fn main() -> anyhow::Result<()>{
|
|||
handler.push(ctrlc);
|
||||
|
||||
let monitoring = tokio::spawn(async move {
|
||||
let config = if !rx_brd.is_empty() {
|
||||
rx_brd.recv().await?
|
||||
} else {
|
||||
let config = {
|
||||
let mut tick = tokio::time::interval(Duration::from_millis(500));
|
||||
loop {
|
||||
tick.tick().await;
|
||||
|
|
|
|||
|
|
@ -36,38 +36,39 @@ pub mod v2 {
|
|||
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
|
||||
use super::*;
|
||||
|
||||
enum ControllerResult<'a> {
|
||||
Process(Option<ProcessesController<'a>>),
|
||||
File(Option<FilesController<'a>>),
|
||||
Service(Option<ServicesController<'a>>),
|
||||
enum ControllerResult {
|
||||
Process(Option<ProcessesController>),
|
||||
File(Option<FilesController>),
|
||||
Service(Option<ServicesController>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Supervisor<'a> {
|
||||
prcs : LinkedList<ProcessesController<'a>>,
|
||||
files : LinkedList<FilesController<'a>>,
|
||||
services : LinkedList<ServicesController<'a>>,
|
||||
struct Supervisor {
|
||||
prcs : LinkedList<ProcessesController>,
|
||||
files : LinkedList<FilesController>,
|
||||
services : LinkedList<ServicesController>,
|
||||
}
|
||||
|
||||
impl<'a> Supervisor<'a> {
|
||||
pub fn new() -> Supervisor<'a> {
|
||||
impl Supervisor {
|
||||
pub fn new() -> Supervisor {
|
||||
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
|
||||
}
|
||||
pub async fn with_config(mut self, config: &'a Processes) -> Supervisor<'a> {
|
||||
pub async fn with_config(mut self, config: &Processes) -> Supervisor {
|
||||
let _ = config.processes.iter()
|
||||
.for_each(|prc| {
|
||||
let (rx, tx) = mpsc::channel::<Events<'a>>(10);
|
||||
let (rx, tx) = mpsc::channel::<Events>(10);
|
||||
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
|
||||
if !self.prcs.contains(&temp) {
|
||||
self.prcs.push_back(temp);
|
||||
}
|
||||
let rx = Arc::new(rx);
|
||||
let proc_name: Arc<str> = Arc::from(prc.name.clone());
|
||||
|
||||
let _ = prc.dependencies.files.iter()
|
||||
.for_each(|file| {
|
||||
let mut hm = HashMap::new();
|
||||
let triggers = FileTriggersForController { on_change: &file.triggers.on_change, on_delete: &file.triggers.on_delete};
|
||||
hm.insert(prc.name.as_str(), (triggers, rx.clone()));
|
||||
let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())};
|
||||
hm.insert(proc_name.clone(), (triggers, rx.clone()));
|
||||
|
||||
let tempfile = FilesController::new(&file.filename.as_str(), hm)
|
||||
.with_path(&file.src);
|
||||
|
|
@ -90,23 +91,24 @@ pub mod v2 {
|
|||
let rx = rx.clone();
|
||||
let serv_cont = ServicesController::new().with_access_name(
|
||||
&serv.hostname,
|
||||
access_url
|
||||
&access_url
|
||||
);
|
||||
// triggers
|
||||
let triggers = Triggers::new_service(&serv.triggers.on_lost, serv.triggers.wait);
|
||||
let arc: Arc<str> = Arc::from(serv.triggers.on_lost.clone());
|
||||
let triggers = Triggers::new_service(arc, serv.triggers.wait);
|
||||
|
||||
if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) {
|
||||
proc.add_process(&prc.name, triggers, rx);
|
||||
} else {
|
||||
// vecdeque for queue
|
||||
let mut vec: VecDeque<&'a str> = VecDeque::new();
|
||||
vec.push_back(&prc.name);
|
||||
let mut vec: VecDeque<Arc<str>> = VecDeque::new();
|
||||
vec.push_back(proc_name.clone());
|
||||
// connection_queue
|
||||
let mut connection_queue: BTreeMap<u32, VecDeque<&'a str>> = BTreeMap::new();
|
||||
let mut connection_queue: BTreeMap<u32, VecDeque<Arc<str>>> = BTreeMap::new();
|
||||
connection_queue.insert(serv.triggers.wait, vec);
|
||||
// event_reg
|
||||
let mut hm = HashMap::new();
|
||||
hm.insert(prc.name.as_str(), (triggers, rx));
|
||||
hm.insert(proc_name.clone(), (triggers, rx));
|
||||
|
||||
let serv_cont = serv_cont.with_params(connection_queue, hm);
|
||||
self.services.push_back(serv_cont);
|
||||
|
|
@ -118,21 +120,48 @@ pub mod v2 {
|
|||
pub fn get_stats(&self) -> String {
|
||||
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len())
|
||||
}
|
||||
async fn proccess_prc<T>(&mut self) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<'a> ProcessUnit<'a> for Supervisor<'a> {
|
||||
async fn process(&'a mut self) {
|
||||
impl ProcessUnit for Supervisor {
|
||||
async fn process(&mut self) {
|
||||
info!("Initializing monitoring ...");
|
||||
loop {
|
||||
// let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
|
||||
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
|
||||
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
|
||||
// let res = tokio::join!(prc.process(), file.process(), serv.process());
|
||||
if let Some(mut val) = self.prcs.pop_front() {
|
||||
tokio::spawn(async move {val.process().await;}).await;
|
||||
tasks.push(
|
||||
tokio::spawn( async move {
|
||||
val.process().await;
|
||||
ControllerResult::Process(Some(val))
|
||||
})
|
||||
);
|
||||
}
|
||||
if let Some(mut val) = self.files.pop_front() {
|
||||
tasks.push(
|
||||
tokio::spawn( async move {
|
||||
val.process().await;
|
||||
ControllerResult::File(Some(val))
|
||||
})
|
||||
);
|
||||
}
|
||||
if let Some(mut val) = self.services.pop_front() {
|
||||
tasks.push(
|
||||
tokio::spawn( async move {
|
||||
val.process().await;
|
||||
ControllerResult::Service(Some(val))
|
||||
})
|
||||
);
|
||||
}
|
||||
for task in tasks {
|
||||
match task.await {
|
||||
Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val),
|
||||
Ok(ControllerResult::File(Some(val))) => self.files.push_back(val),
|
||||
Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val),
|
||||
Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."),
|
||||
_ => { /* DEAD END (CAN NOT BE EXECUTED) */},
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
|
@ -149,7 +178,7 @@ pub mod v2 {
|
|||
) -> anyhow::Result<()> {
|
||||
let mut supervisor = Supervisor::new().with_config(&config).await;
|
||||
info!("Monitoring: {} ", &supervisor.get_stats());
|
||||
supervisor.process().await;
|
||||
supervisor.process().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ pub mod v2 {
|
|||
|
||||
type MpscSender = Arc<Sender<Events>>;
|
||||
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
|
||||
type EventHandlers = HashMap<String, (Triggers, MpscSender)>;
|
||||
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
|
||||
// type wrapper for service wait queue
|
||||
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
|
||||
|
||||
|
|
@ -92,7 +92,7 @@ pub mod v2 {
|
|||
});
|
||||
}
|
||||
// event add
|
||||
self.event_registrator.entry(proc_name.to_string()).or_insert((trigger, sender));
|
||||
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
|
||||
}
|
||||
async fn check_state(&self) -> anyhow::Result<()> {
|
||||
let mut addrs = self.access_url.to_socket_addrs()?;
|
||||
|
|
@ -148,7 +148,7 @@ pub mod v2 {
|
|||
for name in iterator {
|
||||
let proc_name = name.to_string();
|
||||
info!("Trying to notify process `{}` ...", &proc_name);
|
||||
let sender_opt = self.event_registrator.get(&proc_name)
|
||||
let sender_opt = self.event_registrator.get(&name)
|
||||
.map(|(trigger, sender)|
|
||||
(trigger.to_service_negative_event(name.clone()), sender)
|
||||
);
|
||||
|
|
|
|||
Loading…
Reference in New Issue