Compare commits

...

2 Commits

Author SHA1 Message Date
prplV 403285a937 OWNERSHIP FIX 3 2025-05-04 09:53:59 -04:00
prplV a16eb78b79 key: String -> Arc<str> 2025-05-04 09:31:42 -04:00
3 changed files with 61 additions and 34 deletions

View File

@ -58,9 +58,7 @@ async fn main() -> anyhow::Result<()>{
handler.push(ctrlc);
let monitoring = tokio::spawn(async move {
let config = if !rx_brd.is_empty() {
rx_brd.recv().await?
} else {
let config = {
let mut tick = tokio::time::interval(Duration::from_millis(500));
loop {
tick.tick().await;

View File

@ -36,38 +36,39 @@ pub mod v2 {
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
use super::*;
enum ControllerResult<'a> {
Process(Option<ProcessesController<'a>>),
File(Option<FilesController<'a>>),
Service(Option<ServicesController<'a>>),
enum ControllerResult {
Process(Option<ProcessesController>),
File(Option<FilesController>),
Service(Option<ServicesController>),
}
#[derive(Debug)]
struct Supervisor<'a> {
prcs : LinkedList<ProcessesController<'a>>,
files : LinkedList<FilesController<'a>>,
services : LinkedList<ServicesController<'a>>,
struct Supervisor {
prcs : LinkedList<ProcessesController>,
files : LinkedList<FilesController>,
services : LinkedList<ServicesController>,
}
impl<'a> Supervisor<'a> {
pub fn new() -> Supervisor<'a> {
impl Supervisor {
pub fn new() -> Supervisor {
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
}
pub async fn with_config(mut self, config: &'a Processes) -> Supervisor<'a> {
pub async fn with_config(mut self, config: &Processes) -> Supervisor {
let _ = config.processes.iter()
.for_each(|prc| {
let (rx, tx) = mpsc::channel::<Events<'a>>(10);
let (rx, tx) = mpsc::channel::<Events>(10);
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
if !self.prcs.contains(&temp) {
self.prcs.push_back(temp);
}
let rx = Arc::new(rx);
let proc_name: Arc<str> = Arc::from(prc.name.clone());
let _ = prc.dependencies.files.iter()
.for_each(|file| {
let mut hm = HashMap::new();
let triggers = FileTriggersForController { on_change: &file.triggers.on_change, on_delete: &file.triggers.on_delete};
hm.insert(prc.name.as_str(), (triggers, rx.clone()));
let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())};
hm.insert(proc_name.clone(), (triggers, rx.clone()));
let tempfile = FilesController::new(&file.filename.as_str(), hm)
.with_path(&file.src);
@ -90,23 +91,24 @@ pub mod v2 {
let rx = rx.clone();
let serv_cont = ServicesController::new().with_access_name(
&serv.hostname,
access_url
&access_url
);
// triggers
let triggers = Triggers::new_service(&serv.triggers.on_lost, serv.triggers.wait);
let arc: Arc<str> = Arc::from(serv.triggers.on_lost.clone());
let triggers = Triggers::new_service(arc, serv.triggers.wait);
if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) {
proc.add_process(&prc.name, triggers, rx);
} else {
// vecdeque for queue
let mut vec: VecDeque<&'a str> = VecDeque::new();
vec.push_back(&prc.name);
let mut vec: VecDeque<Arc<str>> = VecDeque::new();
vec.push_back(proc_name.clone());
// connection_queue
let mut connection_queue: BTreeMap<u32, VecDeque<&'a str>> = BTreeMap::new();
let mut connection_queue: BTreeMap<u32, VecDeque<Arc<str>>> = BTreeMap::new();
connection_queue.insert(serv.triggers.wait, vec);
// event_reg
let mut hm = HashMap::new();
hm.insert(prc.name.as_str(), (triggers, rx));
hm.insert(proc_name.clone(), (triggers, rx));
let serv_cont = serv_cont.with_params(connection_queue, hm);
self.services.push_back(serv_cont);
@ -118,21 +120,48 @@ pub mod v2 {
pub fn get_stats(&self) -> String {
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len())
}
async fn proccess_prc<T>(&mut self) {
}
}
#[async_trait]
impl<'a> ProcessUnit<'a> for Supervisor<'a> {
async fn process(&'a mut self) {
impl ProcessUnit for Supervisor {
async fn process(&mut self) {
info!("Initializing monitoring ...");
loop {
// let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
// let res = tokio::join!(prc.process(), file.process(), serv.process());
if let Some(mut val) = self.prcs.pop_front() {
tokio::spawn(async move {val.process().await;}).await;
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::Process(Some(val))
})
);
}
if let Some(mut val) = self.files.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::File(Some(val))
})
);
}
if let Some(mut val) = self.services.pop_front() {
tasks.push(
tokio::spawn( async move {
val.process().await;
ControllerResult::Service(Some(val))
})
);
}
for task in tasks {
match task.await {
Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val),
Ok(ControllerResult::File(Some(val))) => self.files.push_back(val),
Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val),
Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."),
_ => { /* DEAD END (CAN NOT BE EXECUTED) */},
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
@ -149,7 +178,7 @@ pub mod v2 {
) -> anyhow::Result<()> {
let mut supervisor = Supervisor::new().with_config(&config).await;
info!("Monitoring: {} ", &supervisor.get_stats());
supervisor.process().await;
supervisor.process().await;
Ok(())
}

View File

@ -16,7 +16,7 @@ pub mod v2 {
type MpscSender = Arc<Sender<Events>>;
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers = HashMap<String, (Triggers, MpscSender)>;
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
// type wrapper for service wait queue
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
@ -92,7 +92,7 @@ pub mod v2 {
});
}
// event add
self.event_registrator.entry(proc_name.to_string()).or_insert((trigger, sender));
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
}
async fn check_state(&self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?;
@ -148,7 +148,7 @@ pub mod v2 {
for name in iterator {
let proc_name = name.to_string();
info!("Trying to notify process `{}` ...", &proc_name);
let sender_opt = self.event_registrator.get(&proc_name)
let sender_opt = self.event_registrator.get(&name)
.map(|(trigger, sender)|
(trigger.to_service_negative_event(name.clone()), sender)
);