Compare commits

..

No commits in common. "281841b68a2ed5be2accbdb1735562306ad5561c" and "3c22a67052507031207b5b3b57fcf0138c4188bf" have entirely different histories.

4 changed files with 56 additions and 52 deletions

View File

@ -41,13 +41,13 @@ impl<'a> FileTriggerType {
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events { pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
return match self { return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)), FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger)), FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
} }
} }
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events { pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
return match self { return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())), FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger.on_delete.clone())), FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete.clone())),
} }
} }
} }

View File

@ -12,43 +12,46 @@
pub mod v2 { pub mod v2 {
use log::{error, info, warn}; use log::{error, info, warn};
// use std::collections::HashMap;
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit}; use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
use super::*; use super::*;
use std::{collections::HashMap, path::Path}; use std::{collections::HashMap, path::Path};
type MpscSender = Arc<Sender<Events>>; type MpscSender<'a> = Arc<Sender<Events<'a>>>;
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>; // type EventHandlers<'a> = HashMap<service name, sender object>
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
#[derive(Debug)] #[derive(Debug)]
pub struct FilesController { pub struct FilesController<'a> {
name : Arc<str>, name : &'a str,
path : String, path : String,
code_name : Arc<str>,
watcher : Option<Inotify>, watcher : Option<Inotify>,
triggers : EventHandlers, // obj: Arc<Files>,
triggers : EventHandlers<'a>,
code_name : String,
} }
impl PartialEq for FilesController { impl<'a> PartialEq for FilesController<'a> {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.code_name == other.code_name self.path == other.path && self.name == other.name
} }
} }
impl FilesController { impl<'a> FilesController<'a> {
pub fn new(name: &str, triggers: EventHandlers) -> FilesController { pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> {
let name: Arc<str> = Arc::from(name);
Self { Self {
name: name.clone(), name,
path : String::new(), path : String::new(),
watcher: None, watcher: None,
triggers, triggers,
code_name : name.clone(), code_name : name.to_string(),
} }
} }
pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController> { pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController<'a>> {
self.path = path.as_ref().to_string_lossy().into_owned(); self.path = path.as_ref().to_string_lossy().into_owned();
self.watcher = { self.watcher = {
match create_watcher(&self.name, &self.path) { match create_watcher(self.name, &self.path) {
Ok(val) => Some(val), Ok(val) => Some(val),
Err(er) => { Err(er) => {
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er); error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
@ -56,25 +59,25 @@
} }
} }
}; };
self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name)); self.code_name = format!("{}{}", &self.path, &self.code_name);
Ok(self) Ok(self)
} }
pub fn add_event(&mut self, file_controller : FilesController) { pub fn add_event(&mut self, file_controller : FilesController<'a>) {
for (k, v) in file_controller.triggers { for (k, v) in file_controller.triggers {
self.triggers.entry(k).or_insert(v); self.triggers.entry(k).or_insert(v);
} }
} }
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) { async fn trigger_on(&'a mut self, trigger_type: Option<FileTriggerType>) {
let _ = self.triggers.iter() let _ = self.triggers.iter()
.map(|(prc_name, (triggers, channel))| async { .map(|(prc_name, (triggers, channel))| async {
let _ = channel.send({ let _ = channel.send({
match &trigger_type { match &trigger_type {
None => { None => {
Events::Positive(self.code_name.clone()) Events::Positive(&self.code_name)
}, },
Some(event) => { Some(event) => {
info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name); info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
event.event_from_file_trigger_controller(self.code_name.clone(), triggers) event.event_from_file_trigger_controller(self.name, triggers)
}, },
} }
}).await; }).await;
@ -82,11 +85,11 @@
} }
} }
#[async_trait] #[async_trait]
impl ProcessUnit for FilesController { impl<'a> ProcessUnit<'a> for FilesController<'a> {
async fn process(&mut self) { async fn process(&'a mut self) {
// polling file check // polling file check
// 1) existing check // 1) existing check
if let Ok(_) = check_file(&self.name, &self.path).await { if let Ok(_) = check_file(self.name, &self.path).await {
match &mut self.watcher { match &mut self.watcher {
Some(notify) => { Some(notify) => {
let mut buffer = [0; 1024]; let mut buffer = [0; 1024];
@ -97,7 +100,7 @@
) { ) {
warn!("File {} ({}) was changed", self.name, &self.path); warn!("File {} ({}) was changed", self.name, &self.path);
if recreate_watcher { if recreate_watcher {
self.watcher = match create_watcher(&self.name, &self.path) { self.watcher = match create_watcher(self.name, &self.path) {
Ok(notifier) => Some(notifier), Ok(notifier) => Some(notifier),
Err(er) => { Err(er) => {
error!("Failed to recreate watcher for {} ({}) due to {}", error!("Failed to recreate watcher for {} ({}) due to {}",

View File

@ -15,32 +15,32 @@ pub mod v2 {
use super::*; use super::*;
#[derive(Debug)] #[derive(Debug)]
pub struct ProcessesController { pub struct ProcessesController<'a> {
name: Arc<str>, name: &'a str,
bin: String, bin: String,
// obj: Arc<TrackingProcess>, // obj: Arc<TrackingProcess>,
state: ProcessState, state: ProcessState,
event_reader: MpscReciever<Events>, event_reader: MpscReciever<Events<'a>>,
negative_events: HashSet<Arc<str>>, negative_events: HashSet<&'a str>,
} }
impl PartialEq for ProcessesController { impl<'a> PartialEq for ProcessesController<'a> {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.bin == other.bin self.bin == other.bin
} }
} }
impl ProcessesController { impl<'a> ProcessesController<'a> {
pub fn new(name: &str, event_reader: MpscReciever<Events>) -> ProcessesController { pub fn new(name: &'a str, event_reader: MpscReciever<Events<'a>>) -> ProcessesController<'a> {
ProcessesController { ProcessesController {
name : Arc::from(name), name,
bin: String::new(), bin: String::new(),
state : ProcessState::Stopped, state : ProcessState::Stopped,
event_reader, event_reader,
negative_events : HashSet::new(), negative_events : HashSet::new(),
} }
} }
pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController { pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController<'a> {
self.bin = bin.as_ref().to_string_lossy().into_owned(); self.bin = bin.as_ref().to_string_lossy().into_owned();
self self
} }
@ -51,22 +51,22 @@ pub mod v2 {
info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name); info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name);
}, },
"stop" => { "stop" => {
if is_active(&self.name).await { if is_active(self.name).await {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
terminate_process(&self.name).await; terminate_process(self.name).await;
self.state = ProcessState::Stopped; self.state = ProcessState::Stopped;
} }
}, },
"hold" => { "hold" => {
if !is_frozen(&self.name).await { if !is_frozen(self.name).await {
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
freeze_process(&self.name).await; freeze_process(self.name).await;
self.state = ProcessState::Holding; self.state = ProcessState::Holding;
} }
}, },
"restart" => { "restart" => {
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
let _ = restart_process(&self.name, &self.bin).await; let _ = restart_process(self.name, &self.bin).await;
}, },
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name), _ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
} }
@ -75,13 +75,13 @@ pub mod v2 {
} }
#[async_trait] #[async_trait]
impl ProcessUnit for ProcessesController { impl<'a> ProcessUnit<'a> for ProcessesController<'a> {
async fn process(&mut self) { async fn process(&'a mut self) {
if self.negative_events.len() == 0 { if self.negative_events.len() == 0 {
match self.state { match self.state {
ProcessState::Holding => { ProcessState::Holding => {
info!("No negative dependecies events on {} process. Unfreezing ...", self.name); info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
if let Err(er) = unfreeze_process(&self.name).await { if let Err(er) = unfreeze_process(self.name).await {
error!("Cannot unfreeze process {} : {}", self.name, er); error!("Cannot unfreeze process {} : {}", self.name, er);
} else { } else {
self.state = ProcessState::Pending; self.state = ProcessState::Pending;
@ -89,7 +89,7 @@ pub mod v2 {
}, },
ProcessState::Stopped => { ProcessState::Stopped => {
info!("No negative dependecies events on {} process. Starting ...", self.name); info!("No negative dependecies events on {} process. Starting ...", self.name);
if let Err(er) = start_process(&self.name, &self.bin).await { if let Err(er) = start_process(self.name, &self.bin).await {
error!("Cannot start process {} : {}", self.name, er); error!("Cannot start process {} : {}", self.name, er);
} else { } else {
self.state = ProcessState::Pending; self.state = ProcessState::Pending;
@ -101,8 +101,8 @@ pub mod v2 {
while let Ok(event) = self.event_reader.try_recv() { while let Ok(event) = self.event_reader.try_recv() {
match event { match event {
Events::Positive(target) => { Events::Positive(target) => {
if self.negative_events.contains(&target) { if self.negative_events.contains(target) {
self.negative_events.remove(&target); self.negative_events.remove(target);
} }
}, },
Events::Negative(event) => { Events::Negative(event) => {
@ -110,12 +110,12 @@ pub mod v2 {
NegativeOutcomes::FileWasChanged(target, dep_type, trigger) | NegativeOutcomes::FileWasChanged(target, dep_type, trigger) |
NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) | NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) |
NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => { NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
if !self.negative_events.contains(&target) { if !self.negative_events.contains(target) {
self.negative_events.insert(target.clone()); self.negative_events.insert(target);
self.trigger_on( self.trigger_on(
&target, target,
&trigger, trigger,
dep_type dep_type
).await; ).await;
} }

View File

@ -1,4 +1,5 @@
use crate::options::structs::CustomError; use crate::options::structs::{CustomError, Services};
use super::prcs::{is_active, is_frozen};
use log::{error, warn}; use log::{error, warn};
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc; use std::sync::Arc;