new structs, controller and processors

feature/configv2
prplV 2025-04-17 09:59:33 -04:00
parent cd7669d942
commit e3f07f42a6
4 changed files with 157 additions and 36 deletions

View File

@ -7,11 +7,61 @@ pub enum DependencyType {
File, File,
Service, Service,
} }
pub struct ServiceWaitConfig { wait: u32, delay: u32}
impl Default for ServiceWaitConfig {
fn default() -> Self {
Self { wait: 0, delay: 5 }
}
}
pub enum FileTriggerType {
OnChange,
OnDelete,
}
impl std::fmt::Display for FileTriggerType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self {
FileTriggerType::OnChange => write!(f, "File was changed"),
FileTriggerType::OnDelete => write!(f, "File was moved or deleted"),
}
}
}
impl<'a> FileTriggerType {
pub fn event(&self, file_name: &'a str, trigger: &'a str) -> Events<'a> {
return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
}
}
pub fn event_from_file_trigger_controller(&self, file_name: &'a str, trigger: &FileTriggersForController<'a>) -> Events<'a> {
return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete)),
}
}
}
pub enum Triggers<'a> { pub enum Triggers<'a> {
File{ on_change: &'a str, on_delete: &'a str }, File{ on_change: &'a str, on_delete: &'a str },
Service(&'a str), Service(&'a str),
} }
impl<'a> Triggers<'a> {
pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> {
Triggers::File { on_change, on_delete }
}
pub fn new_service(on_lost: &'a str) -> Triggers<'a> {
Triggers::Service(on_lost)
}
}
pub struct FileTriggersForController<'a> { pub on_change: &'a str, pub on_delete: &'a str }
pub struct ServiceTriggersForController<'a>(&'a str);
impl std::fmt::Display for DependencyType { impl std::fmt::Display for DependencyType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self { return match self {
@ -20,6 +70,7 @@ impl std::fmt::Display for DependencyType {
} }
} }
} }
pub enum ProcessState { pub enum ProcessState {
Pending, Pending,
Holding, Holding,

View File

@ -10,49 +10,103 @@ use tokio::time::Duration;
use crate::options::structs::Events; use crate::options::structs::Events;
pub mod v2 { pub mod v2 {
// use std::collections::HashMap; use log::{error, info, warn};
use crate::options::structs::{Triggers, ProcessUnit};
use super::*;
use std::path::Path;
type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>; // use std::collections::HashMap;
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
use super::*;
use std::{collections::HashMap, path::Path};
// type EventHandlers<'a> = HashMap<service name, sender object>
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
//
type FileTriggersWrapper<'a> = HashMap<&'a str, Triggers<'a>>;
struct FilesController<'a> { struct FilesController<'a> {
name: &'a str, name: &'a str,
path: String, path: String,
watcher: Option<Inotify>, watcher: Option<Inotify>,
// obj: Arc<Files>, // obj: Arc<Files>,
triggers: Triggers<'a>, triggers: EventHandlers<'a>,
event_registrator: EventHandlers<'a>,
} }
impl<'a> FilesController<'a> { impl<'a> FilesController<'a> {
pub fn new(name: &'a str, triggers: Triggers<'a>, event_registrator: EventHandlers<'a>) -> FilesController<'a> { pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> {
Self { Self {
name, name,
path : String::new(), path : String::new(),
watcher: None, watcher: None,
triggers, triggers,
event_registrator,
} }
} }
pub async fn with_path(&mut self, path: impl AsRef<Path>) -> anyhow::Result<()> { pub async fn with_path(&mut self, path: impl AsRef<Path>) -> anyhow::Result<()> {
self.path = path.as_ref().to_string_lossy().into_owned(); self.path = path.as_ref().to_string_lossy().into_owned();
self.watcher = Some({ self.watcher = {
match create_watcher(self.name, &self.path).await { match create_watcher(self.name, &self.path).await {
Ok(val) => val, Ok(val) => Some(val),
Err(er) => return Err(er) Err(er) => {
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
return Err(er)
} }
}); }
};
Ok(()) Ok(())
} }
pub async fn trigger_on(&mut self) { async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
// trigger handler let _ = self.triggers.iter()
.map(|(_, (triggers, channel))| async {
let _ = channel.send({
match &trigger_type {
None => Events::Positive(self.name),
Some(event) => {
info!("Event on {} ({}) : {}", self.name, &self.path, event);
event.event_from_file_trigger_controller(self.name, triggers)
},
}
}).await;
});
} }
} }
impl<'a> ProcessUnit<'a> for FilesController<'a> { impl<'a> ProcessUnit<'a> for FilesController<'a> {
async fn process(&mut self) { async fn process(&mut self) {
// polling file check // polling file check
// 1) existing check
if let Ok(_) = check_file(self.name, &self.path).await {
match &mut self.watcher {
Some(notify) => {
let mut buffer = [0; 1024];
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
if let (recreate_watcher, true) = (
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
) {
warn!("File {} ({}) was changed", self.name, &self.path);
if recreate_watcher {
self.watcher = match create_watcher(self.name, &self.path).await {
Ok(notifier) => Some(notifier),
Err(er) => {
error!("Failed to recreate watcher for {} ({}) due to {}",
self.name,
&self.path,
er
);
None
},
}
}
self.trigger_on(Some(FileTriggerType::OnChange)).await;
}
}
},
None => { /* DEAD END */},
}
} else {
warn!("File {} ({}) was not found in determined scope", self.name, &self.path);
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
return;
}
self.trigger_on(None).await;
// 2) change check
} }
} }
} }

View File

@ -1,4 +1,3 @@
use crate::options::structs::CustomError;
use log::{error, warn}; use log::{error, warn};
use std::process::{Command, Output}; use std::process::{Command, Output};
use std::sync::Arc; use std::sync::Arc;
@ -52,7 +51,7 @@ pub mod v2 {
impl<'a> ProcessUnit<'a> for ProcessController<'a> { impl<'a> ProcessUnit<'a> for ProcessController<'a> {
async fn process(&mut self) { async fn process(&mut self) {
if let Ok(event) = self.event_reader.try_recv() { while let Ok(event) = self.event_reader.try_recv() {
match event { match event {
Events::Positive(target) => { Events::Positive(target) => {
if self.negative_events.contains(target) { if self.negative_events.contains(target) {
@ -82,12 +81,18 @@ pub mod v2 {
match self.state { match self.state {
ProcessState::Holding => { ProcessState::Holding => {
info!("No negative dependecies events on {} process. Unfreezing ...", self.name); info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
unfreeze_process(self.name).await; if let Err(er) = unfreeze_process(self.name).await {
error!("Cannot unfreeze process {} due to {}", self.name, er);
} else {
self.state = ProcessState::Pending;
}
}, },
ProcessState::Stopped => { ProcessState::Stopped => {
info!("No negative dependecies events on {} process. Starting ...", self.name); info!("No negative dependecies events on {} process. Starting ...", self.name);
if let Err(_) = start_process(self.name, &self.obj.path).await { if let Err(_) = start_process(self.name, &self.obj.path).await {
error!("Cannot start process {} due to {}", self.name, "system unrecognized error"); error!("Cannot start process {} due to {}", self.name, "system unrecognized error");
} else {
self.state = ProcessState::Pending;
} }
}, },
_ => {}, _ => {},
@ -255,14 +260,11 @@ pub async fn freeze_process(name: &str) {
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn unfreeze_process(name: &str) { pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> {
let _ = Command::new("pkill") let _ = Command::new("pkill")
.args(["-CONT", name]) .args(["-CONT", name])
.output() .output()?;
.unwrap_or_else(|_| { Ok(())
error!("Failed to unfreeze process");
std::process::exit(101);
});
} }
/// # Fn `restart_process` /// # Fn `restart_process`
@ -278,7 +280,7 @@ pub async fn unfreeze_process(name: &str) {
/// ///
/// *depends on* : fn `start_process`, fn `terminate_process` /// *depends on* : fn `start_process`, fn `terminate_process`
/// ///
pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> { pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> {
terminate_process(name).await; terminate_process(name).await;
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
start_process(name, path).await start_process(name, path).await
@ -297,7 +299,7 @@ pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError>
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> {
// let runsh = format!("{} {}", "exec", path); // let runsh = format!("{} {}", "exec", path);
let mut command = Command::new(path); let mut command = Command::new(path);
// command.arg(path); // command.arg(path);
@ -308,8 +310,7 @@ pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
Ok(()) Ok(())
} }
Err(er) => { Err(er) => {
println!("{:?}", er); Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er)))
Err(CustomError::Fatal)
} }
} }
} }

View File

@ -6,23 +6,38 @@ use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
use tokio::sync::mpsc::Sender as MpscSender; use tokio::sync::mpsc::Sender as MpscSender;
use crate::options::structs::Events;
pub mod v2 { pub mod v2 {
use crate::options::structs::{Triggers, ProcessUnit}; use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceWaitConfig};
use super::*; use super::*;
use std::collections::HashMap; use std::collections::HashMap;
type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>; // type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
struct ServicesController<'a> { struct ServicesController<'a> {
name : &'a str, name : &'a str,
// obj: Arc<Services>, access_url : String,
triggers: Triggers<'a>, config: ServiceWaitConfig,
event_registrator : EventHandlers<'a>, event_registrator : EventHandlers<'a>,
} }
// self impl impl<'a> ServicesController<'a> {
pub fn new() -> ServicesController<'a> {
ServicesController {
name : "",
access_url : String::new(),
config: ServiceWaitConfig::default(),
event_registrator : EventHandlers::new(),
}
}
pub async fn with_params(&mut self, hostname: &'a str, port: Option<&'a str>, event_registrator: EventHandlers<'a>) -> anyhow::Result<()> {
self.name = hostname;
self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)));
self.event_registrator = event_registrator;
Ok(())
}
}
impl<'a> ProcessUnit<'a> for ServicesController<'a> { impl<'a> ProcessUnit<'a> for ServicesController<'a> {
async fn process(&mut self) { async fn process(&mut self) {