OWNERSHIP FIX 3
parent
a16eb78b79
commit
403285a937
|
|
@ -58,9 +58,7 @@ async fn main() -> anyhow::Result<()>{
|
||||||
handler.push(ctrlc);
|
handler.push(ctrlc);
|
||||||
|
|
||||||
let monitoring = tokio::spawn(async move {
|
let monitoring = tokio::spawn(async move {
|
||||||
let config = if !rx_brd.is_empty() {
|
let config = {
|
||||||
rx_brd.recv().await?
|
|
||||||
} else {
|
|
||||||
let mut tick = tokio::time::interval(Duration::from_millis(500));
|
let mut tick = tokio::time::interval(Duration::from_millis(500));
|
||||||
loop {
|
loop {
|
||||||
tick.tick().await;
|
tick.tick().await;
|
||||||
|
|
|
||||||
|
|
@ -36,38 +36,39 @@ pub mod v2 {
|
||||||
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
|
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
enum ControllerResult<'a> {
|
enum ControllerResult {
|
||||||
Process(Option<ProcessesController<'a>>),
|
Process(Option<ProcessesController>),
|
||||||
File(Option<FilesController<'a>>),
|
File(Option<FilesController>),
|
||||||
Service(Option<ServicesController<'a>>),
|
Service(Option<ServicesController>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct Supervisor<'a> {
|
struct Supervisor {
|
||||||
prcs : LinkedList<ProcessesController<'a>>,
|
prcs : LinkedList<ProcessesController>,
|
||||||
files : LinkedList<FilesController<'a>>,
|
files : LinkedList<FilesController>,
|
||||||
services : LinkedList<ServicesController<'a>>,
|
services : LinkedList<ServicesController>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Supervisor<'a> {
|
impl Supervisor {
|
||||||
pub fn new() -> Supervisor<'a> {
|
pub fn new() -> Supervisor {
|
||||||
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
|
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
|
||||||
}
|
}
|
||||||
pub async fn with_config(mut self, config: &'a Processes) -> Supervisor<'a> {
|
pub async fn with_config(mut self, config: &Processes) -> Supervisor {
|
||||||
let _ = config.processes.iter()
|
let _ = config.processes.iter()
|
||||||
.for_each(|prc| {
|
.for_each(|prc| {
|
||||||
let (rx, tx) = mpsc::channel::<Events<'a>>(10);
|
let (rx, tx) = mpsc::channel::<Events>(10);
|
||||||
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
|
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
|
||||||
if !self.prcs.contains(&temp) {
|
if !self.prcs.contains(&temp) {
|
||||||
self.prcs.push_back(temp);
|
self.prcs.push_back(temp);
|
||||||
}
|
}
|
||||||
let rx = Arc::new(rx);
|
let rx = Arc::new(rx);
|
||||||
|
let proc_name: Arc<str> = Arc::from(prc.name.clone());
|
||||||
|
|
||||||
let _ = prc.dependencies.files.iter()
|
let _ = prc.dependencies.files.iter()
|
||||||
.for_each(|file| {
|
.for_each(|file| {
|
||||||
let mut hm = HashMap::new();
|
let mut hm = HashMap::new();
|
||||||
let triggers = FileTriggersForController { on_change: &file.triggers.on_change, on_delete: &file.triggers.on_delete};
|
let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())};
|
||||||
hm.insert(prc.name.as_str(), (triggers, rx.clone()));
|
hm.insert(proc_name.clone(), (triggers, rx.clone()));
|
||||||
|
|
||||||
let tempfile = FilesController::new(&file.filename.as_str(), hm)
|
let tempfile = FilesController::new(&file.filename.as_str(), hm)
|
||||||
.with_path(&file.src);
|
.with_path(&file.src);
|
||||||
|
|
@ -90,23 +91,24 @@ pub mod v2 {
|
||||||
let rx = rx.clone();
|
let rx = rx.clone();
|
||||||
let serv_cont = ServicesController::new().with_access_name(
|
let serv_cont = ServicesController::new().with_access_name(
|
||||||
&serv.hostname,
|
&serv.hostname,
|
||||||
access_url
|
&access_url
|
||||||
);
|
);
|
||||||
// triggers
|
// triggers
|
||||||
let triggers = Triggers::new_service(&serv.triggers.on_lost, serv.triggers.wait);
|
let arc: Arc<str> = Arc::from(serv.triggers.on_lost.clone());
|
||||||
|
let triggers = Triggers::new_service(arc, serv.triggers.wait);
|
||||||
|
|
||||||
if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) {
|
if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) {
|
||||||
proc.add_process(&prc.name, triggers, rx);
|
proc.add_process(&prc.name, triggers, rx);
|
||||||
} else {
|
} else {
|
||||||
// vecdeque for queue
|
// vecdeque for queue
|
||||||
let mut vec: VecDeque<&'a str> = VecDeque::new();
|
let mut vec: VecDeque<Arc<str>> = VecDeque::new();
|
||||||
vec.push_back(&prc.name);
|
vec.push_back(proc_name.clone());
|
||||||
// connection_queue
|
// connection_queue
|
||||||
let mut connection_queue: BTreeMap<u32, VecDeque<&'a str>> = BTreeMap::new();
|
let mut connection_queue: BTreeMap<u32, VecDeque<Arc<str>>> = BTreeMap::new();
|
||||||
connection_queue.insert(serv.triggers.wait, vec);
|
connection_queue.insert(serv.triggers.wait, vec);
|
||||||
// event_reg
|
// event_reg
|
||||||
let mut hm = HashMap::new();
|
let mut hm = HashMap::new();
|
||||||
hm.insert(prc.name.as_str(), (triggers, rx));
|
hm.insert(proc_name.clone(), (triggers, rx));
|
||||||
|
|
||||||
let serv_cont = serv_cont.with_params(connection_queue, hm);
|
let serv_cont = serv_cont.with_params(connection_queue, hm);
|
||||||
self.services.push_back(serv_cont);
|
self.services.push_back(serv_cont);
|
||||||
|
|
@ -118,21 +120,48 @@ pub mod v2 {
|
||||||
pub fn get_stats(&self) -> String {
|
pub fn get_stats(&self) -> String {
|
||||||
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len())
|
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len())
|
||||||
}
|
}
|
||||||
async fn proccess_prc<T>(&mut self) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<'a> ProcessUnit<'a> for Supervisor<'a> {
|
impl ProcessUnit for Supervisor {
|
||||||
async fn process(&'a mut self) {
|
async fn process(&mut self) {
|
||||||
info!("Initializing monitoring ...");
|
info!("Initializing monitoring ...");
|
||||||
loop {
|
loop {
|
||||||
// let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
|
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
|
||||||
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
|
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
|
||||||
// let res = tokio::join!(prc.process(), file.process(), serv.process());
|
// let res = tokio::join!(prc.process(), file.process(), serv.process());
|
||||||
if let Some(mut val) = self.prcs.pop_front() {
|
if let Some(mut val) = self.prcs.pop_front() {
|
||||||
tokio::spawn(async move {val.process().await;}).await;
|
tasks.push(
|
||||||
|
tokio::spawn( async move {
|
||||||
|
val.process().await;
|
||||||
|
ControllerResult::Process(Some(val))
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(mut val) = self.files.pop_front() {
|
||||||
|
tasks.push(
|
||||||
|
tokio::spawn( async move {
|
||||||
|
val.process().await;
|
||||||
|
ControllerResult::File(Some(val))
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(mut val) = self.services.pop_front() {
|
||||||
|
tasks.push(
|
||||||
|
tokio::spawn( async move {
|
||||||
|
val.process().await;
|
||||||
|
ControllerResult::Service(Some(val))
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
for task in tasks {
|
||||||
|
match task.await {
|
||||||
|
Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val),
|
||||||
|
Ok(ControllerResult::File(Some(val))) => self.files.push_back(val),
|
||||||
|
Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val),
|
||||||
|
Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."),
|
||||||
|
_ => { /* DEAD END (CAN NOT BE EXECUTED) */},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue