Compare commits
3 Commits
3c22a67052
...
281841b68a
| Author | SHA1 | Date |
|---|---|---|
|
|
281841b68a | |
|
|
34979a035d | |
|
|
09c1baed8e |
|
|
@ -41,13 +41,13 @@ impl<'a> FileTriggerType {
|
||||||
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
|
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
|
||||||
return match self {
|
return match self {
|
||||||
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
|
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
|
||||||
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
|
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
|
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
|
||||||
return match self {
|
return match self {
|
||||||
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
|
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
|
||||||
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete.clone())),
|
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger.on_delete.clone())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,46 +12,43 @@
|
||||||
|
|
||||||
pub mod v2 {
|
pub mod v2 {
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
|
|
||||||
// use std::collections::HashMap;
|
|
||||||
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
|
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::{collections::HashMap, path::Path};
|
use std::{collections::HashMap, path::Path};
|
||||||
|
|
||||||
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
|
type MpscSender = Arc<Sender<Events>>;
|
||||||
// type EventHandlers<'a> = HashMap<service name, sender object>
|
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
|
||||||
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct FilesController<'a> {
|
pub struct FilesController {
|
||||||
name : &'a str,
|
name : Arc<str>,
|
||||||
path : String,
|
path : String,
|
||||||
|
code_name : Arc<str>,
|
||||||
watcher : Option<Inotify>,
|
watcher : Option<Inotify>,
|
||||||
// obj: Arc<Files>,
|
triggers : EventHandlers,
|
||||||
triggers : EventHandlers<'a>,
|
|
||||||
code_name : String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> PartialEq for FilesController<'a> {
|
impl PartialEq for FilesController {
|
||||||
fn eq(&self, other: &Self) -> bool {
|
fn eq(&self, other: &Self) -> bool {
|
||||||
self.path == other.path && self.name == other.name
|
self.code_name == other.code_name
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> FilesController<'a> {
|
impl FilesController {
|
||||||
pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> {
|
pub fn new(name: &str, triggers: EventHandlers) -> FilesController {
|
||||||
|
let name: Arc<str> = Arc::from(name);
|
||||||
Self {
|
Self {
|
||||||
name,
|
name: name.clone(),
|
||||||
path : String::new(),
|
path : String::new(),
|
||||||
watcher: None,
|
watcher: None,
|
||||||
triggers,
|
triggers,
|
||||||
code_name : name.to_string(),
|
code_name : name.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController<'a>> {
|
pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController> {
|
||||||
self.path = path.as_ref().to_string_lossy().into_owned();
|
self.path = path.as_ref().to_string_lossy().into_owned();
|
||||||
self.watcher = {
|
self.watcher = {
|
||||||
match create_watcher(self.name, &self.path) {
|
match create_watcher(&self.name, &self.path) {
|
||||||
Ok(val) => Some(val),
|
Ok(val) => Some(val),
|
||||||
Err(er) => {
|
Err(er) => {
|
||||||
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
|
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
|
||||||
|
|
@ -59,25 +56,25 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.code_name = format!("{}{}", &self.path, &self.code_name);
|
self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name));
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
pub fn add_event(&mut self, file_controller : FilesController<'a>) {
|
pub fn add_event(&mut self, file_controller : FilesController) {
|
||||||
for (k, v) in file_controller.triggers {
|
for (k, v) in file_controller.triggers {
|
||||||
self.triggers.entry(k).or_insert(v);
|
self.triggers.entry(k).or_insert(v);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn trigger_on(&'a mut self, trigger_type: Option<FileTriggerType>) {
|
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
|
||||||
let _ = self.triggers.iter()
|
let _ = self.triggers.iter()
|
||||||
.map(|(prc_name, (triggers, channel))| async {
|
.map(|(prc_name, (triggers, channel))| async {
|
||||||
let _ = channel.send({
|
let _ = channel.send({
|
||||||
match &trigger_type {
|
match &trigger_type {
|
||||||
None => {
|
None => {
|
||||||
Events::Positive(&self.code_name)
|
Events::Positive(self.code_name.clone())
|
||||||
},
|
},
|
||||||
Some(event) => {
|
Some(event) => {
|
||||||
info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
|
info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
|
||||||
event.event_from_file_trigger_controller(self.name, triggers)
|
event.event_from_file_trigger_controller(self.code_name.clone(), triggers)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}).await;
|
}).await;
|
||||||
|
|
@ -85,11 +82,11 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<'a> ProcessUnit<'a> for FilesController<'a> {
|
impl ProcessUnit for FilesController {
|
||||||
async fn process(&'a mut self) {
|
async fn process(&mut self) {
|
||||||
// polling file check
|
// polling file check
|
||||||
// 1) existing check
|
// 1) existing check
|
||||||
if let Ok(_) = check_file(self.name, &self.path).await {
|
if let Ok(_) = check_file(&self.name, &self.path).await {
|
||||||
match &mut self.watcher {
|
match &mut self.watcher {
|
||||||
Some(notify) => {
|
Some(notify) => {
|
||||||
let mut buffer = [0; 1024];
|
let mut buffer = [0; 1024];
|
||||||
|
|
@ -100,7 +97,7 @@
|
||||||
) {
|
) {
|
||||||
warn!("File {} ({}) was changed", self.name, &self.path);
|
warn!("File {} ({}) was changed", self.name, &self.path);
|
||||||
if recreate_watcher {
|
if recreate_watcher {
|
||||||
self.watcher = match create_watcher(self.name, &self.path) {
|
self.watcher = match create_watcher(&self.name, &self.path) {
|
||||||
Ok(notifier) => Some(notifier),
|
Ok(notifier) => Some(notifier),
|
||||||
Err(er) => {
|
Err(er) => {
|
||||||
error!("Failed to recreate watcher for {} ({}) due to {}",
|
error!("Failed to recreate watcher for {} ({}) due to {}",
|
||||||
|
|
|
||||||
|
|
@ -15,32 +15,32 @@ pub mod v2 {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ProcessesController<'a> {
|
pub struct ProcessesController {
|
||||||
name: &'a str,
|
name: Arc<str>,
|
||||||
bin: String,
|
bin: String,
|
||||||
// obj: Arc<TrackingProcess>,
|
// obj: Arc<TrackingProcess>,
|
||||||
state: ProcessState,
|
state: ProcessState,
|
||||||
event_reader: MpscReciever<Events<'a>>,
|
event_reader: MpscReciever<Events>,
|
||||||
negative_events: HashSet<&'a str>,
|
negative_events: HashSet<Arc<str>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> PartialEq for ProcessesController<'a> {
|
impl PartialEq for ProcessesController {
|
||||||
fn eq(&self, other: &Self) -> bool {
|
fn eq(&self, other: &Self) -> bool {
|
||||||
self.bin == other.bin
|
self.bin == other.bin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> ProcessesController<'a> {
|
impl ProcessesController {
|
||||||
pub fn new(name: &'a str, event_reader: MpscReciever<Events<'a>>) -> ProcessesController<'a> {
|
pub fn new(name: &str, event_reader: MpscReciever<Events>) -> ProcessesController {
|
||||||
ProcessesController {
|
ProcessesController {
|
||||||
name,
|
name : Arc::from(name),
|
||||||
bin: String::new(),
|
bin : String::new(),
|
||||||
state : ProcessState::Stopped,
|
state : ProcessState::Stopped,
|
||||||
event_reader,
|
event_reader,
|
||||||
negative_events : HashSet::new(),
|
negative_events : HashSet::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController<'a> {
|
pub fn with_exe(mut self, bin: impl AsRef<Path>) -> ProcessesController {
|
||||||
self.bin = bin.as_ref().to_string_lossy().into_owned();
|
self.bin = bin.as_ref().to_string_lossy().into_owned();
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
@ -51,22 +51,22 @@ pub mod v2 {
|
||||||
info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name);
|
info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name);
|
||||||
},
|
},
|
||||||
"stop" => {
|
"stop" => {
|
||||||
if is_active(self.name).await {
|
if is_active(&self.name).await {
|
||||||
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
|
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
|
||||||
terminate_process(self.name).await;
|
terminate_process(&self.name).await;
|
||||||
self.state = ProcessState::Stopped;
|
self.state = ProcessState::Stopped;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"hold" => {
|
"hold" => {
|
||||||
if !is_frozen(self.name).await {
|
if !is_frozen(&self.name).await {
|
||||||
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
|
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
|
||||||
freeze_process(self.name).await;
|
freeze_process(&self.name).await;
|
||||||
self.state = ProcessState::Holding;
|
self.state = ProcessState::Holding;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"restart" => {
|
"restart" => {
|
||||||
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
|
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
|
||||||
let _ = restart_process(self.name, &self.bin).await;
|
let _ = restart_process(&self.name, &self.bin).await;
|
||||||
},
|
},
|
||||||
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
|
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
|
||||||
}
|
}
|
||||||
|
|
@ -75,13 +75,13 @@ pub mod v2 {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<'a> ProcessUnit<'a> for ProcessesController<'a> {
|
impl ProcessUnit for ProcessesController {
|
||||||
async fn process(&'a mut self) {
|
async fn process(&mut self) {
|
||||||
if self.negative_events.len() == 0 {
|
if self.negative_events.len() == 0 {
|
||||||
match self.state {
|
match self.state {
|
||||||
ProcessState::Holding => {
|
ProcessState::Holding => {
|
||||||
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
|
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
|
||||||
if let Err(er) = unfreeze_process(self.name).await {
|
if let Err(er) = unfreeze_process(&self.name).await {
|
||||||
error!("Cannot unfreeze process {} : {}", self.name, er);
|
error!("Cannot unfreeze process {} : {}", self.name, er);
|
||||||
} else {
|
} else {
|
||||||
self.state = ProcessState::Pending;
|
self.state = ProcessState::Pending;
|
||||||
|
|
@ -89,7 +89,7 @@ pub mod v2 {
|
||||||
},
|
},
|
||||||
ProcessState::Stopped => {
|
ProcessState::Stopped => {
|
||||||
info!("No negative dependecies events on {} process. Starting ...", self.name);
|
info!("No negative dependecies events on {} process. Starting ...", self.name);
|
||||||
if let Err(er) = start_process(self.name, &self.bin).await {
|
if let Err(er) = start_process(&self.name, &self.bin).await {
|
||||||
error!("Cannot start process {} : {}", self.name, er);
|
error!("Cannot start process {} : {}", self.name, er);
|
||||||
} else {
|
} else {
|
||||||
self.state = ProcessState::Pending;
|
self.state = ProcessState::Pending;
|
||||||
|
|
@ -101,8 +101,8 @@ pub mod v2 {
|
||||||
while let Ok(event) = self.event_reader.try_recv() {
|
while let Ok(event) = self.event_reader.try_recv() {
|
||||||
match event {
|
match event {
|
||||||
Events::Positive(target) => {
|
Events::Positive(target) => {
|
||||||
if self.negative_events.contains(target) {
|
if self.negative_events.contains(&target) {
|
||||||
self.negative_events.remove(target);
|
self.negative_events.remove(&target);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Events::Negative(event) => {
|
Events::Negative(event) => {
|
||||||
|
|
@ -110,12 +110,12 @@ pub mod v2 {
|
||||||
NegativeOutcomes::FileWasChanged(target, dep_type, trigger) |
|
NegativeOutcomes::FileWasChanged(target, dep_type, trigger) |
|
||||||
NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) |
|
NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) |
|
||||||
NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
|
NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
|
||||||
if !self.negative_events.contains(target) {
|
if !self.negative_events.contains(&target) {
|
||||||
self.negative_events.insert(target);
|
self.negative_events.insert(target.clone());
|
||||||
|
|
||||||
self.trigger_on(
|
self.trigger_on(
|
||||||
target,
|
&target,
|
||||||
trigger,
|
&trigger,
|
||||||
dep_type
|
dep_type
|
||||||
).await;
|
).await;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,4 @@
|
||||||
use crate::options::structs::{CustomError, Services};
|
use crate::options::structs::CustomError;
|
||||||
use super::prcs::{is_active, is_frozen};
|
|
||||||
use log::{error, warn};
|
use log::{error, warn};
|
||||||
use std::net::{TcpStream, ToSocketAddrs};
|
use std::net::{TcpStream, ToSocketAddrs};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue