supervisor work

feature/configv2
prplV 2025-04-23 10:34:07 -04:00
parent 2495fb84cf
commit 541b0f52dd
4 changed files with 129 additions and 32 deletions

View File

@ -23,12 +23,58 @@ use tokio::sync::mpsc;
use tokio::time::Duration; use tokio::time::Duration;
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender}; use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender};
// controllers import
use prcs::v2::ProcessesController;
use files::v2::FilesController;
use services::v2::ServicesController;
const GET_ID_CMD: &str = "hostname"; const GET_ID_CMD: &str = "hostname";
pub mod v2 { pub mod v2 {
use std::collections::{HashMap, LinkedList};
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit};
use super::*; use super::*;
struct Supervisor<'a> {
prcs : LinkedList<ProcessesController<'a>>,
files : LinkedList<FilesController<'a>>,
services : LinkedList<ServicesController<'a>>,
}
impl<'a> Supervisor<'a> {
pub fn new(config: &'a Processes) -> Supervisor<'a> {
let mut p = LinkedList::new();
let mut f = LinkedList::new();
let mut s = LinkedList::new();
let _ = config.processes.iter()
.map(|prc| {
let (rx, tx) = mpsc::channel::<Events<'a>>(10);
let temp = ProcessesController::new(&prc.name, tx);
if !p.contains(&temp) {
p.push_back(temp);
}
let rx = Arc::new(rx);
// files
let _ = prc.dependencies.files.iter()
.map(|file| async {
let mut hm = HashMap::new();
let triggers = FileTriggersForController { on_change: &file.triggers.on_change, on_delete: &file.triggers.on_delete};
hm.insert(&prc.name, (triggers, rx.clone()));
let tempfile = FilesController::new(&file.filename, hm).with_path(file.src).await;
});
// servs
let _ = prc.dependencies.services.iter()
.map(|serv| {
});
});
Supervisor { prcs: p, files: f, services: s }
}
}
// spawn tasks // spawn tasks
// spawn prc // spawn prc
// spawn files // spawn files
@ -55,6 +101,20 @@ pub mod v2 {
Ok(()) Ok(())
} }
// async fn generate_controllers<'a>(config: Processes) -> (HashSet<ProcessesController<'a>>, HashSet<FilesController<'a>>, HashSet<ServicesController<'a>>) {
// let mut prcs: HashSet<ProcessesController<'a>> = HashSet::new();
// let mut files: HashSet<FilesController<'a>> = HashSet::new();
// let mut services: HashSet<ServicesController<'a>> = HashSet::new();
// for prc in config.processes {
// let (rx, tx) = mpsc::channel::<Events<'a>>(10);
// // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path);
// let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path);
// let a = new_prc.process().await;
// }
// (prcs, files, services)
// }
// spawn prc check with semaphore check // spawn prc check with semaphore check
async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) } async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) }

View File

@ -21,7 +21,7 @@
// type EventHandlers<'a> = HashMap<service name, sender object> // type EventHandlers<'a> = HashMap<service name, sender object>
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>; type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
struct FilesController<'a> { pub struct FilesController<'a> {
name : &'a str, name : &'a str,
path : String, path : String,
watcher : Option<Inotify>, watcher : Option<Inotify>,
@ -30,6 +30,12 @@
code_name : String, code_name : String,
} }
impl<'a> PartialEq for FilesController<'a> {
fn eq(&self, other: &Self) -> bool {
self.path == other.path && self.name == other.name
}
}
impl<'a> FilesController<'a> { impl<'a> FilesController<'a> {
pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> { pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> {
Self { Self {
@ -40,7 +46,7 @@
code_name : name.to_string(), code_name : name.to_string(),
} }
} }
pub async fn with_path(&mut self, path: impl AsRef<Path>) -> anyhow::Result<()> { pub async fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController<'a>> {
self.path = path.as_ref().to_string_lossy().into_owned(); self.path = path.as_ref().to_string_lossy().into_owned();
self.watcher = { self.watcher = {
match create_watcher(self.name, &self.path).await { match create_watcher(self.name, &self.path).await {
@ -52,7 +58,7 @@
} }
}; };
self.code_name = format!("{}{}", &self.path, &self.code_name); self.code_name = format!("{}{}", &self.path, &self.code_name);
Ok(()) Ok(self)
} }
async fn trigger_on(&'a mut self, trigger_type: Option<FileTriggerType>) { async fn trigger_on(&'a mut self, trigger_type: Option<FileTriggerType>) {
let _ = self.triggers.iter() let _ = self.triggers.iter()

View File

@ -2,24 +2,47 @@ use log::{error, warn};
use std::process::{Command, Output}; use std::process::{Command, Output};
use std::sync::Arc; use std::sync::Arc;
use tokio::time::Duration; use tokio::time::Duration;
use crate::options::structs::{TrackingProcess, ProcessState, Events, NegativeOutcomes, ProcessUnit}; use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit};
use std::collections::HashSet; use std::collections::HashSet;
use tokio::sync::mpsc::Receiver as MpscReciever; use tokio::sync::mpsc::Receiver as MpscReciever;
pub mod v2 { pub mod v2 {
use log::info; use log::info;
use crate::options::structs::DependencyType; use crate::options::structs::DependencyType;
use std::path::Path;
use super::*; use super::*;
pub struct ProcessController<'a> {
pub struct ProcessesController<'a> {
name: &'a str, name: &'a str,
obj: Arc<TrackingProcess>, bin: String,
// obj: Arc<TrackingProcess>,
state: ProcessState, state: ProcessState,
event_reader: MpscReciever<Events<'a>>, event_reader: MpscReciever<Events<'a>>,
negative_events: HashSet<&'a str>, negative_events: HashSet<&'a str>,
} }
impl<'a> ProcessController<'a> { impl<'a> PartialEq for ProcessesController<'a> {
fn eq(&self, other: &Self) -> bool {
self.bin == other.bin
}
}
impl<'a> ProcessesController<'a> {
pub fn new(name: &'a str, event_reader: MpscReciever<Events<'a>>) -> ProcessesController<'a> {
ProcessesController {
name,
bin: String::new(),
state : ProcessState::Stopped,
event_reader,
negative_events : HashSet::new(),
}
}
pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController<'a> {
self.bin = bin.as_ref().to_string_lossy().into_owned();
self
}
async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) { async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) {
match trigger { match trigger {
"stay" => { "stay" => {
@ -41,7 +64,7 @@ pub mod v2 {
}, },
"restart" => { "restart" => {
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
let _ = restart_process(self.name, &self.obj.path).await; let _ = restart_process(self.name, &self.bin).await;
}, },
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name), _ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
} }
@ -49,8 +72,29 @@ pub mod v2 {
} }
} }
impl<'a> ProcessUnit<'a> for ProcessController<'a> { impl<'a> ProcessUnit<'a> for ProcessesController<'a> {
async fn process(&mut self) { async fn process(&mut self) {
if self.negative_events.len() == 0 {
match self.state {
ProcessState::Holding => {
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
if let Err(er) = unfreeze_process(self.name).await {
error!("Cannot unfreeze process {} : {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
},
ProcessState::Stopped => {
info!("No negative dependecies events on {} process. Starting ...", self.name);
if let Err(er) = start_process(self.name, &self.bin).await {
error!("Cannot start process {} : {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
},
_ => {},
}
}
while let Ok(event) = self.event_reader.try_recv() { while let Ok(event) = self.event_reader.try_recv() {
match event { match event {
Events::Positive(target) => { Events::Positive(target) => {
@ -77,27 +121,6 @@ pub mod v2 {
}, },
} }
} }
if self.negative_events.len() == 0 {
match self.state {
ProcessState::Holding => {
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
if let Err(er) = unfreeze_process(self.name).await {
error!("Cannot unfreeze process {} : {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
},
ProcessState::Stopped => {
info!("No negative dependecies events on {} process. Starting ...", self.name);
if let Err(er) = start_process(self.name, &self.obj.path).await {
error!("Cannot start process {} : {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
},
_ => {},
}
}
} }
} }
} }

View File

@ -22,8 +22,9 @@ pub mod v2 {
type ConnectionQueue<'a> = BTreeMap<u32, VecDeque<&'a str>>; type ConnectionQueue<'a> = BTreeMap<u32, VecDeque<&'a str>>;
#[derive(Debug)] #[derive(Debug)]
struct ServicesController<'a> { pub struct ServicesController<'a> {
// i.e. yandex.ru // i.e. yandex.ru
#[allow(unused)]
name : &'a str, name : &'a str,
// i.e. yandex.ru:443 // i.e. yandex.ru:443
access_url : String, access_url : String,
@ -34,6 +35,13 @@ pub mod v2 {
// Map of processes with their (trigger and mpsc sender) // Map of processes with their (trigger and mpsc sender)
event_registrator : EventHandlers<'a>, event_registrator : EventHandlers<'a>,
} }
impl<'a> PartialEq for ServicesController<'a> {
fn eq(&self, other: &Self) -> bool {
self.access_url == other.access_url
}
}
impl<'a> ServicesController<'a> { impl<'a> ServicesController<'a> {
pub fn new() -> ServicesController<'a> { pub fn new() -> ServicesController<'a> {
ServicesController { ServicesController {