Compare commits
No commits in common. "2495fb84cfc68eff3c2fdd970838513f1159dfa0" and "0d68efd461b704591512559fdc14b352eaefc2cc" have entirely different histories.
2495fb84cf
...
0d68efd461
|
|
@ -8,7 +8,6 @@ pub enum DependencyType {
|
||||||
Service,
|
Service,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum ServiceState {
|
pub enum ServiceState {
|
||||||
Ok,
|
Ok,
|
||||||
Unavailable
|
Unavailable
|
||||||
|
|
@ -50,24 +49,17 @@ impl<'a> FileTriggerType {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Triggers<'a> {
|
pub enum Triggers<'a> {
|
||||||
File{ on_change: &'a str, on_delete: &'a str },
|
File{ on_change: &'a str, on_delete: &'a str },
|
||||||
Service{on_lost: &'a str, wait: u32},
|
Service(&'a str),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Triggers<'a> {
|
impl<'a> Triggers<'a> {
|
||||||
pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> {
|
pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> {
|
||||||
Triggers::File { on_change, on_delete }
|
Triggers::File { on_change, on_delete }
|
||||||
}
|
}
|
||||||
pub fn new_service(on_lost: &'a str, wait_time: u32) -> Triggers<'a> {
|
pub fn new_service(on_lost: &'a str) -> Triggers<'a> {
|
||||||
Triggers::Service{on_lost, wait: wait_time}
|
Triggers::Service(on_lost)
|
||||||
}
|
|
||||||
pub fn to_service_negative_event(&'a self, service_name: &'a str) -> Option<Events<'a>> {
|
|
||||||
if let Triggers::Service { on_lost, .. } = self {
|
|
||||||
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, &on_lost)))
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -100,7 +92,7 @@ pub enum NegativeOutcomes<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ProcessUnit<'a> {
|
pub trait ProcessUnit<'a> {
|
||||||
fn process(&'a mut self) -> impl std::future::Future<Output = ()> + Send;
|
fn process(&mut self) -> impl std::future::Future<Output = ()> + Send;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # an Error enum (next will be deleted and replaced)
|
/// # an Error enum (next will be deleted and replaced)
|
||||||
|
|
|
||||||
|
|
@ -1,292 +1,286 @@
|
||||||
use crate::options::structs::{CustomError, Files};
|
use crate::options::structs::{CustomError, Files};
|
||||||
use super::prcs::{is_active, is_frozen};
|
use super::prcs::{is_active, is_frozen};
|
||||||
use inotify::{EventMask, Inotify, WatchMask};
|
use inotify::{EventMask, Inotify, WatchMask};
|
||||||
use std::borrow::BorrowMut;
|
use std::borrow::BorrowMut;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::mpsc::Sender as Sender;
|
use tokio::sync::mpsc::Sender as MpscSender;
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
use crate::options::structs::Events;
|
use crate::options::structs::Events;
|
||||||
|
|
||||||
pub mod v2 {
|
pub mod v2 {
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
|
|
||||||
// use std::collections::HashMap;
|
// use std::collections::HashMap;
|
||||||
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
|
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::{collections::HashMap, path::Path};
|
use std::{collections::HashMap, path::Path};
|
||||||
|
|
||||||
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
|
// type EventHandlers<'a> = HashMap<service name, sender object>
|
||||||
// type EventHandlers<'a> = HashMap<service name, sender object>
|
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
|
||||||
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
|
|
||||||
|
|
||||||
struct FilesController<'a> {
|
struct FilesController<'a> {
|
||||||
name : &'a str,
|
name: &'a str,
|
||||||
path : String,
|
path: String,
|
||||||
watcher : Option<Inotify>,
|
watcher: Option<Inotify>,
|
||||||
// obj: Arc<Files>,
|
// obj: Arc<Files>,
|
||||||
triggers : EventHandlers<'a>,
|
triggers: EventHandlers<'a>,
|
||||||
code_name : String,
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> FilesController<'a> {
|
impl<'a> FilesController<'a> {
|
||||||
pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> {
|
pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> {
|
||||||
Self {
|
Self {
|
||||||
name,
|
name,
|
||||||
path : String::new(),
|
path : String::new(),
|
||||||
watcher: None,
|
watcher: None,
|
||||||
triggers,
|
triggers,
|
||||||
code_name : name.to_string(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
pub async fn with_path(&mut self, path: impl AsRef<Path>) -> anyhow::Result<()> {
|
}
|
||||||
self.path = path.as_ref().to_string_lossy().into_owned();
|
pub async fn with_path(&mut self, path: impl AsRef<Path>) -> anyhow::Result<()> {
|
||||||
self.watcher = {
|
self.path = path.as_ref().to_string_lossy().into_owned();
|
||||||
match create_watcher(self.name, &self.path).await {
|
self.watcher = {
|
||||||
Ok(val) => Some(val),
|
match create_watcher(self.name, &self.path).await {
|
||||||
Err(er) => {
|
Ok(val) => Some(val),
|
||||||
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
|
Err(er) => {
|
||||||
return Err(er)
|
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
|
||||||
}
|
return Err(er)
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
self.code_name = format!("{}{}", &self.path, &self.code_name);
|
};
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
|
||||||
async fn trigger_on(&'a mut self, trigger_type: Option<FileTriggerType>) {
|
|
||||||
let _ = self.triggers.iter()
|
|
||||||
.map(|(prc_name, (triggers, channel))| async {
|
|
||||||
let _ = channel.send({
|
|
||||||
match &trigger_type {
|
|
||||||
None => {
|
|
||||||
Events::Positive(&self.code_name)
|
|
||||||
},
|
|
||||||
Some(event) => {
|
|
||||||
info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
|
|
||||||
event.event_from_file_trigger_controller(self.name, triggers)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}).await;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
impl<'a> ProcessUnit<'a> for FilesController<'a> {
|
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
|
||||||
async fn process(&'a mut self) {
|
let _ = self.triggers.iter()
|
||||||
// polling file check
|
.map(|(prc_name, (triggers, channel))| async {
|
||||||
// 1) existing check
|
let _ = channel.send({
|
||||||
if let Ok(_) = check_file(self.name, &self.path).await {
|
match &trigger_type {
|
||||||
match &mut self.watcher {
|
None => Events::Positive(self.name),
|
||||||
Some(notify) => {
|
Some(event) => {
|
||||||
let mut buffer = [0; 1024];
|
info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
|
||||||
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
|
event.event_from_file_trigger_controller(self.name, triggers)
|
||||||
if let (recreate_watcher, true) = (
|
|
||||||
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
|
|
||||||
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
|
|
||||||
) {
|
|
||||||
warn!("File {} ({}) was changed", self.name, &self.path);
|
|
||||||
if recreate_watcher {
|
|
||||||
self.watcher = match create_watcher(self.name, &self.path).await {
|
|
||||||
Ok(notifier) => Some(notifier),
|
|
||||||
Err(er) => {
|
|
||||||
error!("Failed to recreate watcher for {} ({}) due to {}",
|
|
||||||
self.name,
|
|
||||||
&self.path,
|
|
||||||
er
|
|
||||||
);
|
|
||||||
None
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.trigger_on(Some(FileTriggerType::OnChange)).await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
None => { /* DEAD END */},
|
|
||||||
}
|
}
|
||||||
} else {
|
}).await;
|
||||||
warn!("File {} ({}) was not found in determined scope", self.name, &self.path);
|
});
|
||||||
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
self.trigger_on(None).await;
|
|
||||||
// 2) change check
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl<'a> ProcessUnit<'a> for FilesController<'a> {
|
||||||
/// # Fn `create_watcher`
|
async fn process(&mut self) {
|
||||||
/// ## for creating watcher on file's delete | update events
|
// polling file check
|
||||||
///
|
// 1) existing check
|
||||||
/// *input* : `&str`, `&str`
|
if let Ok(_) = check_file(self.name, &self.path).await {
|
||||||
///
|
match &mut self.watcher {
|
||||||
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
|
Some(notify) => {
|
||||||
///
|
let mut buffer = [0; 1024];
|
||||||
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
|
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
|
||||||
///
|
if let (recreate_watcher, true) = (
|
||||||
/// *managing* : current file's name: &str, path in local storage to current file: &str
|
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
|
||||||
///
|
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
|
||||||
/// *depends on* : -
|
) {
|
||||||
///
|
warn!("File {} ({}) was changed", self.name, &self.path);
|
||||||
pub async fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
|
if recreate_watcher {
|
||||||
let src = format!("{}{}", path, filename);
|
self.watcher = match create_watcher(self.name, &self.path).await {
|
||||||
let inotify: Inotify = Inotify::init()?;
|
Ok(notifier) => Some(notifier),
|
||||||
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
|
Err(er) => {
|
||||||
Ok(inotify)
|
error!("Failed to recreate watcher for {} ({}) due to {}",
|
||||||
}
|
self.name,
|
||||||
|
&self.path,
|
||||||
/// # Fn `create_watcher`
|
er
|
||||||
/// ## for managing processes by checking dep files' states
|
);
|
||||||
///
|
None
|
||||||
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
},
|
||||||
///
|
}
|
||||||
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
|
|
||||||
///
|
|
||||||
/// *initiator* : fn `utils::running_handler`
|
|
||||||
///
|
|
||||||
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
|
||||||
///
|
|
||||||
/// *depends on* : Files
|
|
||||||
///
|
|
||||||
pub async fn file_handler(
|
|
||||||
name: &str,
|
|
||||||
files: &[Files],
|
|
||||||
tx: Arc<mpsc::Sender<u8>>,
|
|
||||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
for (i, file) in files.iter().enumerate() {
|
|
||||||
// let src = format!("{}{}", file.src, file.filename);
|
|
||||||
if check_file(&file.filename, &file.src).await.is_err() {
|
|
||||||
if !is_active(name).await || is_frozen(name).await {
|
|
||||||
return Err(anyhow::Error::msg("Process is frozen or stopped"));
|
|
||||||
}
|
|
||||||
match file.triggers.on_delete.as_str() {
|
|
||||||
"stay" => {
|
|
||||||
tx.send(9).await.unwrap();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
"stop" => {
|
|
||||||
if is_active(name).await {
|
|
||||||
tx.send(1).await.unwrap();
|
|
||||||
}
|
|
||||||
return Err(anyhow::Error::msg("Process was stopped"));
|
|
||||||
}
|
|
||||||
"hold" => {
|
|
||||||
if is_active(name).await {
|
|
||||||
tx.send(2).await.unwrap();
|
|
||||||
return Err(anyhow::Error::msg("Process was frozen"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
||||||
tx.send(101).await.unwrap();
|
|
||||||
return Err(anyhow::Error::msg("Impermissible character or word in file trigger"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if is_active(name).await && !is_frozen(name).await {
|
|
||||||
let watchers = watchers.clone();
|
|
||||||
// println!("mutex: {:?}", watchers);
|
|
||||||
let mut buffer = [0; 128];
|
|
||||||
let mut mutex_guard = watchers.lock().await;
|
|
||||||
if let Some(notify) = mutex_guard.get_mut(i) {
|
|
||||||
let events = notify.read_events(&mut buffer);
|
|
||||||
// println!("{:?}", events);
|
|
||||||
if events.is_ok() {
|
|
||||||
let events: Vec<EventMask> = events
|
|
||||||
.unwrap()
|
|
||||||
.map(|mask| mask.mask)
|
|
||||||
.filter(|mask| {
|
|
||||||
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
for event in events {
|
|
||||||
if let EventMask::DELETE_SELF = event {
|
|
||||||
// ! warning (DELETE_SELF event) !
|
|
||||||
// println!("! warning (DELETE_SELF event) !");
|
|
||||||
// * watcher recreation after dealing with file recreation mechanism in text editors
|
|
||||||
let mutex = notify.borrow_mut();
|
|
||||||
|
|
||||||
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
|
||||||
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
|
||||||
*mutex = watcher;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
match file.triggers.on_change.as_str() {
|
|
||||||
"stop" => {
|
|
||||||
let _ = tx.send(7).await;
|
|
||||||
}
|
|
||||||
"restart" => {
|
|
||||||
let _ = tx.send(8).await;
|
|
||||||
}
|
|
||||||
"stay" => {
|
|
||||||
let _ = tx.send(9).await;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
let _ = tx.send(101).await;
|
|
||||||
}
|
}
|
||||||
|
self.trigger_on(Some(FileTriggerType::OnChange)).await;
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
None => { /* DEAD END */},
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// # Fn `check_file`
|
|
||||||
/// ## for checking existance of current file
|
|
||||||
///
|
|
||||||
/// *input* : `&str`, `&str`
|
|
||||||
///
|
|
||||||
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
|
|
||||||
///
|
|
||||||
/// *initiator* : fn `file_handler`
|
|
||||||
///
|
|
||||||
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
|
|
||||||
///
|
|
||||||
/// *depends on* : network activity
|
|
||||||
///
|
|
||||||
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
|
|
||||||
let arc_name = Arc::new(filename.to_string());
|
|
||||||
let arc_path = Arc::new(path.to_string());
|
|
||||||
tokio::task::spawn_blocking(move || {
|
|
||||||
let file_concat = format!("{}{}", arc_path, arc_name);
|
|
||||||
let path = Path::new(&file_concat);
|
|
||||||
if path.exists() {
|
|
||||||
Ok(())
|
|
||||||
} else {
|
} else {
|
||||||
Err(CustomError::Fatal)
|
warn!("File {} ({}) was not found in determined scope", self.name, &self.path);
|
||||||
|
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
})
|
self.trigger_on(None).await;
|
||||||
.await
|
// 2) change check
|
||||||
.unwrap_or_else(|_| {
|
}
|
||||||
panic!("Corrupted while file check process");
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
/// # Fn `create_watcher`
|
||||||
mod files_unittests {
|
/// ## for creating watcher on file's delete | update events
|
||||||
use super::*;
|
///
|
||||||
#[tokio::test]
|
/// *input* : `&str`, `&str`
|
||||||
async fn try_to_create_watcher() {
|
///
|
||||||
let res = create_watcher("dep-file", "./tests/examples/").await;
|
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
|
||||||
assert!(res.is_ok());
|
///
|
||||||
}
|
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
|
||||||
#[tokio::test]
|
///
|
||||||
async fn try_to_create_invalid_watcher() {
|
/// *managing* : current file's name: &str, path in local storage to current file: &str
|
||||||
let res = create_watcher("invalid-file", "/path/to/the/no/dir").await;
|
///
|
||||||
assert!(res.is_err());
|
/// *depends on* : -
|
||||||
}
|
///
|
||||||
#[tokio::test]
|
pub async fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
|
||||||
async fn check_existing_file() {
|
let src = format!("{}{}", path, filename);
|
||||||
let res = check_file("dep-file", "./tests/examples/").await;
|
let inotify: Inotify = Inotify::init()?;
|
||||||
assert!(res.is_ok());
|
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
|
||||||
}
|
Ok(inotify)
|
||||||
#[tokio::test]
|
}
|
||||||
async fn check_non_existing_file() {
|
|
||||||
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
|
/// # Fn `create_watcher`
|
||||||
assert!(res.is_err());
|
/// ## for managing processes by checking dep files' states
|
||||||
|
///
|
||||||
|
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||||
|
///
|
||||||
|
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `utils::running_handler`
|
||||||
|
///
|
||||||
|
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||||
|
///
|
||||||
|
/// *depends on* : Files
|
||||||
|
///
|
||||||
|
pub async fn file_handler(
|
||||||
|
name: &str,
|
||||||
|
files: &[Files],
|
||||||
|
tx: Arc<mpsc::Sender<u8>>,
|
||||||
|
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
for (i, file) in files.iter().enumerate() {
|
||||||
|
// let src = format!("{}{}", file.src, file.filename);
|
||||||
|
if check_file(&file.filename, &file.src).await.is_err() {
|
||||||
|
if !is_active(name).await || is_frozen(name).await {
|
||||||
|
return Err(anyhow::Error::msg("Process is frozen or stopped"));
|
||||||
|
}
|
||||||
|
match file.triggers.on_delete.as_str() {
|
||||||
|
"stay" => {
|
||||||
|
tx.send(9).await.unwrap();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
"stop" => {
|
||||||
|
if is_active(name).await {
|
||||||
|
tx.send(1).await.unwrap();
|
||||||
|
}
|
||||||
|
return Err(anyhow::Error::msg("Process was stopped"));
|
||||||
|
}
|
||||||
|
"hold" => {
|
||||||
|
if is_active(name).await {
|
||||||
|
tx.send(2).await.unwrap();
|
||||||
|
return Err(anyhow::Error::msg("Process was frozen"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||||
|
tx.send(101).await.unwrap();
|
||||||
|
return Err(anyhow::Error::msg("Impermissible character or word in file trigger"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if is_active(name).await && !is_frozen(name).await {
|
||||||
|
let watchers = watchers.clone();
|
||||||
|
// println!("mutex: {:?}", watchers);
|
||||||
|
let mut buffer = [0; 128];
|
||||||
|
let mut mutex_guard = watchers.lock().await;
|
||||||
|
if let Some(notify) = mutex_guard.get_mut(i) {
|
||||||
|
let events = notify.read_events(&mut buffer);
|
||||||
|
// println!("{:?}", events);
|
||||||
|
if events.is_ok() {
|
||||||
|
let events: Vec<EventMask> = events
|
||||||
|
.unwrap()
|
||||||
|
.map(|mask| mask.mask)
|
||||||
|
.filter(|mask| {
|
||||||
|
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
for event in events {
|
||||||
|
if let EventMask::DELETE_SELF = event {
|
||||||
|
// ! warning (DELETE_SELF event) !
|
||||||
|
// println!("! warning (DELETE_SELF event) !");
|
||||||
|
// * watcher recreation after dealing with file recreation mechanism in text editors
|
||||||
|
let mutex = notify.borrow_mut();
|
||||||
|
|
||||||
|
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
||||||
|
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
||||||
|
*mutex = watcher;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
match file.triggers.on_change.as_str() {
|
||||||
|
"stop" => {
|
||||||
|
let _ = tx.send(7).await;
|
||||||
|
}
|
||||||
|
"restart" => {
|
||||||
|
let _ = tx.send(8).await;
|
||||||
|
}
|
||||||
|
"stay" => {
|
||||||
|
let _ = tx.send(9).await;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
let _ = tx.send(101).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # Fn `check_file`
|
||||||
|
/// ## for checking existance of current file
|
||||||
|
///
|
||||||
|
/// *input* : `&str`, `&str`
|
||||||
|
///
|
||||||
|
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `file_handler`
|
||||||
|
///
|
||||||
|
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
|
||||||
|
///
|
||||||
|
/// *depends on* : network activity
|
||||||
|
///
|
||||||
|
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
|
||||||
|
let arc_name = Arc::new(filename.to_string());
|
||||||
|
let arc_path = Arc::new(path.to_string());
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let file_concat = format!("{}{}", arc_path, arc_name);
|
||||||
|
let path = Path::new(&file_concat);
|
||||||
|
if path.exists() {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(CustomError::Fatal)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
panic!("Corrupted while file check process");
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod files_unittests {
|
||||||
|
use super::*;
|
||||||
|
#[tokio::test]
|
||||||
|
async fn try_to_create_watcher() {
|
||||||
|
let res = create_watcher("dep-file", "./tests/examples/").await;
|
||||||
|
assert!(res.is_ok());
|
||||||
|
}
|
||||||
|
#[tokio::test]
|
||||||
|
async fn try_to_create_invalid_watcher() {
|
||||||
|
let res = create_watcher("invalid-file", "/path/to/the/no/dir").await;
|
||||||
|
assert!(res.is_err());
|
||||||
|
}
|
||||||
|
#[tokio::test]
|
||||||
|
async fn check_existing_file() {
|
||||||
|
let res = check_file("dep-file", "./tests/examples/").await;
|
||||||
|
assert!(res.is_ok());
|
||||||
|
}
|
||||||
|
#[tokio::test]
|
||||||
|
async fn check_non_existing_file() {
|
||||||
|
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
|
||||||
|
assert!(res.is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -82,15 +82,15 @@ pub mod v2 {
|
||||||
ProcessState::Holding => {
|
ProcessState::Holding => {
|
||||||
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
|
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
|
||||||
if let Err(er) = unfreeze_process(self.name).await {
|
if let Err(er) = unfreeze_process(self.name).await {
|
||||||
error!("Cannot unfreeze process {} : {}", self.name, er);
|
error!("Cannot unfreeze process {} due to {}", self.name, er);
|
||||||
} else {
|
} else {
|
||||||
self.state = ProcessState::Pending;
|
self.state = ProcessState::Pending;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ProcessState::Stopped => {
|
ProcessState::Stopped => {
|
||||||
info!("No negative dependecies events on {} process. Starting ...", self.name);
|
info!("No negative dependecies events on {} process. Starting ...", self.name);
|
||||||
if let Err(er) = start_process(self.name, &self.obj.path).await {
|
if let Err(_) = start_process(self.name, &self.obj.path).await {
|
||||||
error!("Cannot start process {} : {}", self.name, er);
|
error!("Cannot start process {} due to {}", self.name, "system unrecognized error");
|
||||||
} else {
|
} else {
|
||||||
self.state = ProcessState::Pending;
|
self.state = ProcessState::Pending;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,33 +5,24 @@ use std::net::{TcpStream, ToSocketAddrs};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::time::{Duration, Instant};
|
use tokio::time::{Duration, Instant};
|
||||||
use tokio::sync::mpsc::Sender as Sender;
|
use tokio::sync::mpsc::Sender as MpscSender;
|
||||||
|
|
||||||
pub mod v2 {
|
pub mod v2 {
|
||||||
use log::info;
|
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceWaitConfig, ServiceState};
|
||||||
|
|
||||||
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::collections::{HashMap, BTreeMap, VecDeque};
|
use std::collections::{HashMap, BTreeMap, VecDeque};
|
||||||
|
|
||||||
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
|
|
||||||
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
|
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
|
||||||
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
|
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
|
||||||
// type wrapper for service wait queue
|
// type wrapper for service wait queue
|
||||||
type ConnectionQueue<'a> = BTreeMap<u32, VecDeque<&'a str>>;
|
type ConnectionQueue<'a> = BTreeMap<u8, VecDeque<(&'a str, Triggers<'a>, MpscSender<Events<'a>>)>>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct ServicesController<'a> {
|
struct ServicesController<'a> {
|
||||||
// i.e. yandex.ru
|
|
||||||
name : &'a str,
|
name : &'a str,
|
||||||
// i.e. yandex.ru:443
|
|
||||||
access_url : String,
|
access_url : String,
|
||||||
// "OK" or "Unavailable"
|
|
||||||
state: ServiceState,
|
state: ServiceState,
|
||||||
// btree map with key as max wait time and it's key to hashmap
|
config: ServiceWaitConfig,
|
||||||
config: ConnectionQueue<'a>,
|
|
||||||
// Map of processes with their (trigger and mpsc sender)
|
|
||||||
event_registrator : EventHandlers<'a>,
|
event_registrator : EventHandlers<'a>,
|
||||||
}
|
}
|
||||||
impl<'a> ServicesController<'a> {
|
impl<'a> ServicesController<'a> {
|
||||||
|
|
@ -40,128 +31,40 @@ pub mod v2 {
|
||||||
name : "",
|
name : "",
|
||||||
access_url : String::new(),
|
access_url : String::new(),
|
||||||
state : ServiceState::Unavailable,
|
state : ServiceState::Unavailable,
|
||||||
config: ConnectionQueue::new(),
|
config: ServiceWaitConfig::default(),
|
||||||
event_registrator : EventHandlers::new(),
|
event_registrator : EventHandlers::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn with_params(
|
pub async fn with_params(&mut self, hostname: &'a str, port: Option<&'a str>, event_registrator: EventHandlers<'a>) -> anyhow::Result<()> {
|
||||||
&mut self,
|
|
||||||
hostname: &'a str,
|
|
||||||
port: Option<&'a str>,
|
|
||||||
conn_queue: ConnectionQueue<'a>,
|
|
||||||
event_reg: EventHandlers<'a>,
|
|
||||||
) -> &mut ServicesController<'a> {
|
|
||||||
self.name = hostname;
|
self.name = hostname;
|
||||||
self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)));
|
self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)));
|
||||||
self.config = conn_queue;
|
self.event_registrator = event_registrator;
|
||||||
self.event_registrator = event_reg;
|
Ok(())
|
||||||
self
|
|
||||||
}
|
}
|
||||||
pub fn add_process(
|
async fn check_state(&mut self) -> anyhow::Result<()> {
|
||||||
&mut self,
|
|
||||||
proc_name: &'a str,
|
|
||||||
trigger: Triggers<'a>,
|
|
||||||
sender: MpscSender<'a>,
|
|
||||||
) {
|
|
||||||
// queue add
|
|
||||||
if let Triggers::Service { wait, .. } = trigger {
|
|
||||||
self.config.entry(wait)
|
|
||||||
.and_modify(|el| el.push_back(proc_name))
|
|
||||||
.or_insert({
|
|
||||||
let mut temp = VecDeque::new();
|
|
||||||
temp.push_back(proc_name);
|
|
||||||
temp
|
|
||||||
});
|
|
||||||
}
|
|
||||||
// event add
|
|
||||||
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
|
|
||||||
}
|
|
||||||
async fn check_state(&self) -> anyhow::Result<()> {
|
|
||||||
let mut addrs = self.access_url.to_socket_addrs()?;
|
let mut addrs = self.access_url.to_socket_addrs()?;
|
||||||
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
|
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
|
||||||
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
|
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
async fn trigger_on(&'a mut self) {
|
async fn trigger_on(&mut self) {}
|
||||||
match self.state {
|
|
||||||
ServiceState::Ok => {
|
|
||||||
let _ = self.event_registrator
|
|
||||||
.iter()
|
|
||||||
.map(|(_, (_, el))| async {
|
|
||||||
let _ = el.send(Events::Positive(&self.access_url)).await;
|
|
||||||
});
|
|
||||||
},
|
|
||||||
ServiceState::Unavailable => {
|
|
||||||
// looped check and notifying
|
|
||||||
self.looped_check().await;
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
async fn looped_check(self: &'a mut Self) {
|
|
||||||
let longest = self.config.last_entry().unwrap();
|
|
||||||
let longest = longest.key();
|
|
||||||
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
|
|
||||||
let timer = tokio::time::Instant::now();
|
|
||||||
let mut attempt: u32 = 1;
|
|
||||||
let access_url = Arc::new(self.access_url.clone());
|
|
||||||
// let event_registrator = &mut self.event_registrator;
|
|
||||||
|
|
||||||
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
|
|
||||||
// let access_url = access_url.clone();
|
|
||||||
loop {
|
|
||||||
interapter.tick().await;
|
|
||||||
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt);
|
|
||||||
attempt += 1;
|
|
||||||
|
|
||||||
let state_check_result = self.check_state().await;
|
|
||||||
|
|
||||||
if state_check_result.is_ok() {
|
|
||||||
info!("Connection to {} is `OK` now", &access_url);
|
|
||||||
self.state = ServiceState::Ok;
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
let now = timer.elapsed();
|
|
||||||
let iterator = self.config.iter()
|
|
||||||
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
|
|
||||||
.flat_map(|(_, a)| a.iter().copied())
|
|
||||||
.collect::<VecDeque<&str>>();
|
|
||||||
|
|
||||||
for name in iterator {
|
|
||||||
let sender_opt = self.event_registrator.get(name)
|
|
||||||
.map(|(trigger, sender)|
|
|
||||||
(trigger.to_service_negative_event(name), sender)
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Some((tr, tx)) = sender_opt {
|
|
||||||
let _ = tx.send(tr.unwrap()).await;
|
|
||||||
} else {
|
|
||||||
error!("Cannot find {} channel sender in {} service", name, &self.access_url)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}).await {
|
|
||||||
info!("Timeout of establishing connection to {}. ", &access_url);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
impl<'a> ProcessUnit<'a> for ServicesController<'a> {
|
impl<'a> ProcessUnit<'a> for ServicesController<'a> {
|
||||||
async fn process(&'a mut self) {
|
async fn process(&mut self) {
|
||||||
// check_service(hostname, port)
|
// check_service(hostname, port)
|
||||||
let current_state = self.check_state().await;
|
let current_state = self.check_state().await;
|
||||||
match (&self.state, current_state) {
|
match (&self.state, current_state) {
|
||||||
(ServiceState::Unavailable, Ok(_)) => {
|
(ServiceState::Unavailable, Ok(_)) => {
|
||||||
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
warn!("Unreachable for connection service `{}`. Notifying {} process(es)", &self.access_url, self.event_registrator.len());
|
||||||
self.state = ServiceState::Ok;
|
//
|
||||||
self.trigger_on().await;
|
self.state = ServiceState::Unavailable;
|
||||||
},
|
},
|
||||||
(ServiceState::Ok, Err(_)) => {
|
(ServiceState::Ok, Err(_)) => {
|
||||||
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
warn!("Connection with `{}` service was established. Notifying {} process(es)", &self.access_url, self.event_registrator.len());
|
||||||
|
//
|
||||||
self.state = ServiceState::Unavailable;
|
self.state = ServiceState::Unavailable;
|
||||||
self.trigger_on().await;
|
|
||||||
},
|
},
|
||||||
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url),
|
|
||||||
_ => { /* DEAD END WITH NO INTEREST */ },
|
_ => { /* DEAD END WITH NO INTEREST */ },
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue