Compare commits

..

No commits in common. "2495fb84cfc68eff3c2fdd970838513f1159dfa0" and "0d68efd461b704591512559fdc14b352eaefc2cc" have entirely different histories.

4 changed files with 288 additions and 399 deletions

View File

@ -8,7 +8,6 @@ pub enum DependencyType {
Service, Service,
} }
#[derive(Debug)]
pub enum ServiceState { pub enum ServiceState {
Ok, Ok,
Unavailable Unavailable
@ -50,24 +49,17 @@ impl<'a> FileTriggerType {
} }
} }
#[derive(Debug)]
pub enum Triggers<'a> { pub enum Triggers<'a> {
File{ on_change: &'a str, on_delete: &'a str }, File{ on_change: &'a str, on_delete: &'a str },
Service{on_lost: &'a str, wait: u32}, Service(&'a str),
} }
impl<'a> Triggers<'a> { impl<'a> Triggers<'a> {
pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> { pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> {
Triggers::File { on_change, on_delete } Triggers::File { on_change, on_delete }
} }
pub fn new_service(on_lost: &'a str, wait_time: u32) -> Triggers<'a> { pub fn new_service(on_lost: &'a str) -> Triggers<'a> {
Triggers::Service{on_lost, wait: wait_time} Triggers::Service(on_lost)
}
pub fn to_service_negative_event(&'a self, service_name: &'a str) -> Option<Events<'a>> {
if let Triggers::Service { on_lost, .. } = self {
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, &on_lost)))
}
None
} }
} }
@ -100,7 +92,7 @@ pub enum NegativeOutcomes<'a> {
} }
pub trait ProcessUnit<'a> { pub trait ProcessUnit<'a> {
fn process(&'a mut self) -> impl std::future::Future<Output = ()> + Send; fn process(&mut self) -> impl std::future::Future<Output = ()> + Send;
} }
/// # an Error enum (next will be deleted and replaced) /// # an Error enum (next will be deleted and replaced)

View File

@ -1,15 +1,15 @@
use crate::options::structs::{CustomError, Files}; use crate::options::structs::{CustomError, Files};
use super::prcs::{is_active, is_frozen}; use super::prcs::{is_active, is_frozen};
use inotify::{EventMask, Inotify, WatchMask}; use inotify::{EventMask, Inotify, WatchMask};
use std::borrow::BorrowMut; use std::borrow::BorrowMut;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender as Sender; use tokio::sync::mpsc::Sender as MpscSender;
use tokio::time::Duration; use tokio::time::Duration;
use crate::options::structs::Events; use crate::options::structs::Events;
pub mod v2 { pub mod v2 {
use log::{error, info, warn}; use log::{error, info, warn};
// use std::collections::HashMap; // use std::collections::HashMap;
@ -17,17 +17,15 @@
use super::*; use super::*;
use std::{collections::HashMap, path::Path}; use std::{collections::HashMap, path::Path};
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
// type EventHandlers<'a> = HashMap<service name, sender object> // type EventHandlers<'a> = HashMap<service name, sender object>
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>; type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
struct FilesController<'a> { struct FilesController<'a> {
name : &'a str, name: &'a str,
path : String, path: String,
watcher : Option<Inotify>, watcher: Option<Inotify>,
// obj: Arc<Files>, // obj: Arc<Files>,
triggers : EventHandlers<'a>, triggers: EventHandlers<'a>,
code_name : String,
} }
impl<'a> FilesController<'a> { impl<'a> FilesController<'a> {
@ -37,7 +35,6 @@
path : String::new(), path : String::new(),
watcher: None, watcher: None,
triggers, triggers,
code_name : name.to_string(),
} }
} }
pub async fn with_path(&mut self, path: impl AsRef<Path>) -> anyhow::Result<()> { pub async fn with_path(&mut self, path: impl AsRef<Path>) -> anyhow::Result<()> {
@ -51,17 +48,14 @@
} }
} }
}; };
self.code_name = format!("{}{}", &self.path, &self.code_name);
Ok(()) Ok(())
} }
async fn trigger_on(&'a mut self, trigger_type: Option<FileTriggerType>) { async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
let _ = self.triggers.iter() let _ = self.triggers.iter()
.map(|(prc_name, (triggers, channel))| async { .map(|(prc_name, (triggers, channel))| async {
let _ = channel.send({ let _ = channel.send({
match &trigger_type { match &trigger_type {
None => { None => Events::Positive(self.name),
Events::Positive(&self.code_name)
},
Some(event) => { Some(event) => {
info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name); info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
event.event_from_file_trigger_controller(self.name, triggers) event.event_from_file_trigger_controller(self.name, triggers)
@ -72,7 +66,7 @@
} }
} }
impl<'a> ProcessUnit<'a> for FilesController<'a> { impl<'a> ProcessUnit<'a> for FilesController<'a> {
async fn process(&'a mut self) { async fn process(&mut self) {
// polling file check // polling file check
// 1) existing check // 1) existing check
if let Ok(_) = check_file(self.name, &self.path).await { if let Ok(_) = check_file(self.name, &self.path).await {
@ -114,47 +108,47 @@
// 2) change check // 2) change check
} }
} }
} }
/// # Fn `create_watcher` /// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events /// ## for creating watcher on file's delete | update events
/// ///
/// *input* : `&str`, `&str` /// *input* : `&str`, `&str`
/// ///
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction /// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
/// ///
/// *initiator* : fn `file_handler`, fn `utils::run_daemons` /// *initiator* : fn `file_handler`, fn `utils::run_daemons`
/// ///
/// *managing* : current file's name: &str, path in local storage to current file: &str /// *managing* : current file's name: &str, path in local storage to current file: &str
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> { pub async fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
let src = format!("{}{}", path, filename); let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?; let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?; inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
Ok(inotify) Ok(inotify)
} }
/// # Fn `create_watcher` /// # Fn `create_watcher`
/// ## for managing processes by checking dep files' states /// ## for managing processes by checking dep files' states
/// ///
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>` /// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
/// ///
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check /// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
/// ///
/// *initiator* : fn `utils::running_handler` /// *initiator* : fn `utils::running_handler`
/// ///
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>` /// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
/// ///
/// *depends on* : Files /// *depends on* : Files
/// ///
pub async fn file_handler( pub async fn file_handler(
name: &str, name: &str,
files: &[Files], files: &[Files],
tx: Arc<mpsc::Sender<u8>>, tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>, watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
for (i, file) in files.iter().enumerate() { for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename); // let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() { if check_file(&file.filename, &file.src).await.is_err() {
@ -233,22 +227,22 @@
} }
tokio::task::yield_now().await; tokio::task::yield_now().await;
Ok(()) Ok(())
} }
/// # Fn `check_file` /// # Fn `check_file`
/// ## for checking existance of current file /// ## for checking existance of current file
/// ///
/// *input* : `&str`, `&str` /// *input* : `&str`, `&str`
/// ///
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error /// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
/// ///
/// *initiator* : fn `file_handler` /// *initiator* : fn `file_handler`
/// ///
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str` /// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
/// ///
/// *depends on* : network activity /// *depends on* : network activity
/// ///
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string()); let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string()); let arc_path = Arc::new(path.to_string());
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
@ -264,10 +258,10 @@
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
panic!("Corrupted while file check process"); panic!("Corrupted while file check process");
}) })
} }
#[cfg(test)] #[cfg(test)]
mod files_unittests { mod files_unittests {
use super::*; use super::*;
#[tokio::test] #[tokio::test]
async fn try_to_create_watcher() { async fn try_to_create_watcher() {
@ -289,4 +283,4 @@
let res = check_file("invalid-file", "/path/to/the/no/dir").await; let res = check_file("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err()); assert!(res.is_err());
} }
} }

View File

@ -82,15 +82,15 @@ pub mod v2 {
ProcessState::Holding => { ProcessState::Holding => {
info!("No negative dependecies events on {} process. Unfreezing ...", self.name); info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
if let Err(er) = unfreeze_process(self.name).await { if let Err(er) = unfreeze_process(self.name).await {
error!("Cannot unfreeze process {} : {}", self.name, er); error!("Cannot unfreeze process {} due to {}", self.name, er);
} else { } else {
self.state = ProcessState::Pending; self.state = ProcessState::Pending;
} }
}, },
ProcessState::Stopped => { ProcessState::Stopped => {
info!("No negative dependecies events on {} process. Starting ...", self.name); info!("No negative dependecies events on {} process. Starting ...", self.name);
if let Err(er) = start_process(self.name, &self.obj.path).await { if let Err(_) = start_process(self.name, &self.obj.path).await {
error!("Cannot start process {} : {}", self.name, er); error!("Cannot start process {} due to {}", self.name, "system unrecognized error");
} else { } else {
self.state = ProcessState::Pending; self.state = ProcessState::Pending;
} }

View File

@ -5,33 +5,24 @@ use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
use tokio::sync::mpsc::Sender as Sender; use tokio::sync::mpsc::Sender as MpscSender;
pub mod v2 { pub mod v2 {
use log::info; use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceWaitConfig, ServiceState};
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
use super::*; use super::*;
use std::collections::{HashMap, BTreeMap, VecDeque}; use std::collections::{HashMap, BTreeMap, VecDeque};
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>; // type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>; type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
// type wrapper for service wait queue // type wrapper for service wait queue
type ConnectionQueue<'a> = BTreeMap<u32, VecDeque<&'a str>>; type ConnectionQueue<'a> = BTreeMap<u8, VecDeque<(&'a str, Triggers<'a>, MpscSender<Events<'a>>)>>;
#[derive(Debug)]
struct ServicesController<'a> { struct ServicesController<'a> {
// i.e. yandex.ru
name : &'a str, name : &'a str,
// i.e. yandex.ru:443
access_url : String, access_url : String,
// "OK" or "Unavailable"
state: ServiceState, state: ServiceState,
// btree map with key as max wait time and it's key to hashmap config: ServiceWaitConfig,
config: ConnectionQueue<'a>,
// Map of processes with their (trigger and mpsc sender)
event_registrator : EventHandlers<'a>, event_registrator : EventHandlers<'a>,
} }
impl<'a> ServicesController<'a> { impl<'a> ServicesController<'a> {
@ -40,128 +31,40 @@ pub mod v2 {
name : "", name : "",
access_url : String::new(), access_url : String::new(),
state : ServiceState::Unavailable, state : ServiceState::Unavailable,
config: ConnectionQueue::new(), config: ServiceWaitConfig::default(),
event_registrator : EventHandlers::new(), event_registrator : EventHandlers::new(),
} }
} }
pub fn with_params( pub async fn with_params(&mut self, hostname: &'a str, port: Option<&'a str>, event_registrator: EventHandlers<'a>) -> anyhow::Result<()> {
&mut self,
hostname: &'a str,
port: Option<&'a str>,
conn_queue: ConnectionQueue<'a>,
event_reg: EventHandlers<'a>,
) -> &mut ServicesController<'a> {
self.name = hostname; self.name = hostname;
self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))); self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)));
self.config = conn_queue; self.event_registrator = event_registrator;
self.event_registrator = event_reg; Ok(())
self
} }
pub fn add_process( async fn check_state(&mut self) -> anyhow::Result<()> {
&mut self,
proc_name: &'a str,
trigger: Triggers<'a>,
sender: MpscSender<'a>,
) {
// queue add
if let Triggers::Service { wait, .. } = trigger {
self.config.entry(wait)
.and_modify(|el| el.push_back(proc_name))
.or_insert({
let mut temp = VecDeque::new();
temp.push_back(proc_name);
temp
});
}
// event add
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
}
async fn check_state(&self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?; let mut addrs = self.access_url.to_socket_addrs()?;
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))) return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
} }
Ok(()) Ok(())
} }
async fn trigger_on(&'a mut self) { async fn trigger_on(&mut self) {}
match self.state {
ServiceState::Ok => {
let _ = self.event_registrator
.iter()
.map(|(_, (_, el))| async {
let _ = el.send(Events::Positive(&self.access_url)).await;
});
},
ServiceState::Unavailable => {
// looped check and notifying
self.looped_check().await;
},
}
}
async fn looped_check(self: &'a mut Self) {
let longest = self.config.last_entry().unwrap();
let longest = longest.key();
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
let timer = tokio::time::Instant::now();
let mut attempt: u32 = 1;
let access_url = Arc::new(self.access_url.clone());
// let event_registrator = &mut self.event_registrator;
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
// let access_url = access_url.clone();
loop {
interapter.tick().await;
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt);
attempt += 1;
let state_check_result = self.check_state().await;
if state_check_result.is_ok() {
info!("Connection to {} is `OK` now", &access_url);
self.state = ServiceState::Ok;
break;
} else {
let now = timer.elapsed();
let iterator = self.config.iter()
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
.flat_map(|(_, a)| a.iter().copied())
.collect::<VecDeque<&str>>();
for name in iterator {
let sender_opt = self.event_registrator.get(name)
.map(|(trigger, sender)|
(trigger.to_service_negative_event(name), sender)
);
if let Some((tr, tx)) = sender_opt {
let _ = tx.send(tr.unwrap()).await;
} else {
error!("Cannot find {} channel sender in {} service", name, &self.access_url)
}
}
}
}
}).await {
info!("Timeout of establishing connection to {}. ", &access_url);
}
}
} }
impl<'a> ProcessUnit<'a> for ServicesController<'a> { impl<'a> ProcessUnit<'a> for ServicesController<'a> {
async fn process(&'a mut self) { async fn process(&mut self) {
// check_service(hostname, port) // check_service(hostname, port)
let current_state = self.check_state().await; let current_state = self.check_state().await;
match (&self.state, current_state) { match (&self.state, current_state) {
(ServiceState::Unavailable, Ok(_)) => { (ServiceState::Unavailable, Ok(_)) => {
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); warn!("Unreachable for connection service `{}`. Notifying {} process(es)", &self.access_url, self.event_registrator.len());
self.state = ServiceState::Ok; //
self.trigger_on().await; self.state = ServiceState::Unavailable;
}, },
(ServiceState::Ok, Err(_)) => { (ServiceState::Ok, Err(_)) => {
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); warn!("Connection with `{}` service was established. Notifying {} process(es)", &self.access_url, self.event_registrator.len());
//
self.state = ServiceState::Unavailable; self.state = ServiceState::Unavailable;
self.trigger_on().await;
}, },
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url),
_ => { /* DEAD END WITH NO INTEREST */ }, _ => { /* DEAD END WITH NO INTEREST */ },
} }
} }