Compare commits
4 Commits
0d68efd461
...
2495fb84cf
| Author | SHA1 | Date |
|---|---|---|
|
|
2495fb84cf | |
|
|
c3fd0dd09f | |
|
|
502ea114a6 | |
|
|
28092d945a |
|
|
@ -8,6 +8,7 @@ pub enum DependencyType {
|
|||
Service,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ServiceState {
|
||||
Ok,
|
||||
Unavailable
|
||||
|
|
@ -49,17 +50,24 @@ impl<'a> FileTriggerType {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Triggers<'a> {
|
||||
File{ on_change: &'a str, on_delete: &'a str },
|
||||
Service(&'a str),
|
||||
Service{on_lost: &'a str, wait: u32},
|
||||
}
|
||||
|
||||
impl<'a> Triggers<'a> {
|
||||
pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> {
|
||||
Triggers::File { on_change, on_delete }
|
||||
}
|
||||
pub fn new_service(on_lost: &'a str) -> Triggers<'a> {
|
||||
Triggers::Service(on_lost)
|
||||
pub fn new_service(on_lost: &'a str, wait_time: u32) -> Triggers<'a> {
|
||||
Triggers::Service{on_lost, wait: wait_time}
|
||||
}
|
||||
pub fn to_service_negative_event(&'a self, service_name: &'a str) -> Option<Events<'a>> {
|
||||
if let Triggers::Service { on_lost, .. } = self {
|
||||
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, &on_lost)))
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -92,7 +100,7 @@ pub enum NegativeOutcomes<'a> {
|
|||
}
|
||||
|
||||
pub trait ProcessUnit<'a> {
|
||||
fn process(&mut self) -> impl std::future::Future<Output = ()> + Send;
|
||||
fn process(&'a mut self) -> impl std::future::Future<Output = ()> + Send;
|
||||
}
|
||||
|
||||
/// # an Error enum (next will be deleted and replaced)
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ use std::borrow::BorrowMut;
|
|||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Sender as MpscSender;
|
||||
use tokio::sync::mpsc::Sender as Sender;
|
||||
use tokio::time::Duration;
|
||||
use crate::options::structs::Events;
|
||||
|
||||
|
|
@ -17,8 +17,9 @@ pub mod v2 {
|
|||
use super::*;
|
||||
use std::{collections::HashMap, path::Path};
|
||||
|
||||
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
|
||||
// type EventHandlers<'a> = HashMap<service name, sender object>
|
||||
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
|
||||
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
|
||||
|
||||
struct FilesController<'a> {
|
||||
name : &'a str,
|
||||
|
|
@ -26,6 +27,7 @@ pub mod v2 {
|
|||
watcher : Option<Inotify>,
|
||||
// obj: Arc<Files>,
|
||||
triggers : EventHandlers<'a>,
|
||||
code_name : String,
|
||||
}
|
||||
|
||||
impl<'a> FilesController<'a> {
|
||||
|
|
@ -35,6 +37,7 @@ pub mod v2 {
|
|||
path : String::new(),
|
||||
watcher: None,
|
||||
triggers,
|
||||
code_name : name.to_string(),
|
||||
}
|
||||
}
|
||||
pub async fn with_path(&mut self, path: impl AsRef<Path>) -> anyhow::Result<()> {
|
||||
|
|
@ -48,14 +51,17 @@ pub mod v2 {
|
|||
}
|
||||
}
|
||||
};
|
||||
self.code_name = format!("{}{}", &self.path, &self.code_name);
|
||||
Ok(())
|
||||
}
|
||||
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
|
||||
async fn trigger_on(&'a mut self, trigger_type: Option<FileTriggerType>) {
|
||||
let _ = self.triggers.iter()
|
||||
.map(|(prc_name, (triggers, channel))| async {
|
||||
let _ = channel.send({
|
||||
match &trigger_type {
|
||||
None => Events::Positive(self.name),
|
||||
None => {
|
||||
Events::Positive(&self.code_name)
|
||||
},
|
||||
Some(event) => {
|
||||
info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
|
||||
event.event_from_file_trigger_controller(self.name, triggers)
|
||||
|
|
@ -66,7 +72,7 @@ pub mod v2 {
|
|||
}
|
||||
}
|
||||
impl<'a> ProcessUnit<'a> for FilesController<'a> {
|
||||
async fn process(&mut self) {
|
||||
async fn process(&'a mut self) {
|
||||
// polling file check
|
||||
// 1) existing check
|
||||
if let Ok(_) = check_file(self.name, &self.path).await {
|
||||
|
|
|
|||
|
|
@ -82,15 +82,15 @@ pub mod v2 {
|
|||
ProcessState::Holding => {
|
||||
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
|
||||
if let Err(er) = unfreeze_process(self.name).await {
|
||||
error!("Cannot unfreeze process {} due to {}", self.name, er);
|
||||
error!("Cannot unfreeze process {} : {}", self.name, er);
|
||||
} else {
|
||||
self.state = ProcessState::Pending;
|
||||
}
|
||||
},
|
||||
ProcessState::Stopped => {
|
||||
info!("No negative dependecies events on {} process. Starting ...", self.name);
|
||||
if let Err(_) = start_process(self.name, &self.obj.path).await {
|
||||
error!("Cannot start process {} due to {}", self.name, "system unrecognized error");
|
||||
if let Err(er) = start_process(self.name, &self.obj.path).await {
|
||||
error!("Cannot start process {} : {}", self.name, er);
|
||||
} else {
|
||||
self.state = ProcessState::Pending;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,24 +5,33 @@ use std::net::{TcpStream, ToSocketAddrs};
|
|||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tokio::sync::mpsc::Sender as MpscSender;
|
||||
use tokio::sync::mpsc::Sender as Sender;
|
||||
|
||||
pub mod v2 {
|
||||
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceWaitConfig, ServiceState};
|
||||
use log::info;
|
||||
|
||||
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
|
||||
|
||||
use super::*;
|
||||
use std::collections::{HashMap, BTreeMap, VecDeque};
|
||||
|
||||
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
|
||||
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
|
||||
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
|
||||
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
|
||||
// type wrapper for service wait queue
|
||||
type ConnectionQueue<'a> = BTreeMap<u8, VecDeque<(&'a str, Triggers<'a>, MpscSender<Events<'a>>)>>;
|
||||
type ConnectionQueue<'a> = BTreeMap<u32, VecDeque<&'a str>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ServicesController<'a> {
|
||||
// i.e. yandex.ru
|
||||
name : &'a str,
|
||||
// i.e. yandex.ru:443
|
||||
access_url : String,
|
||||
// "OK" or "Unavailable"
|
||||
state: ServiceState,
|
||||
config: ServiceWaitConfig,
|
||||
// btree map with key as max wait time and it's key to hashmap
|
||||
config: ConnectionQueue<'a>,
|
||||
// Map of processes with their (trigger and mpsc sender)
|
||||
event_registrator : EventHandlers<'a>,
|
||||
}
|
||||
impl<'a> ServicesController<'a> {
|
||||
|
|
@ -31,40 +40,128 @@ pub mod v2 {
|
|||
name : "",
|
||||
access_url : String::new(),
|
||||
state : ServiceState::Unavailable,
|
||||
config: ServiceWaitConfig::default(),
|
||||
config: ConnectionQueue::new(),
|
||||
event_registrator : EventHandlers::new(),
|
||||
}
|
||||
}
|
||||
pub async fn with_params(&mut self, hostname: &'a str, port: Option<&'a str>, event_registrator: EventHandlers<'a>) -> anyhow::Result<()> {
|
||||
pub fn with_params(
|
||||
&mut self,
|
||||
hostname: &'a str,
|
||||
port: Option<&'a str>,
|
||||
conn_queue: ConnectionQueue<'a>,
|
||||
event_reg: EventHandlers<'a>,
|
||||
) -> &mut ServicesController<'a> {
|
||||
self.name = hostname;
|
||||
self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)));
|
||||
self.event_registrator = event_registrator;
|
||||
Ok(())
|
||||
self.config = conn_queue;
|
||||
self.event_registrator = event_reg;
|
||||
self
|
||||
}
|
||||
async fn check_state(&mut self) -> anyhow::Result<()> {
|
||||
pub fn add_process(
|
||||
&mut self,
|
||||
proc_name: &'a str,
|
||||
trigger: Triggers<'a>,
|
||||
sender: MpscSender<'a>,
|
||||
) {
|
||||
// queue add
|
||||
if let Triggers::Service { wait, .. } = trigger {
|
||||
self.config.entry(wait)
|
||||
.and_modify(|el| el.push_back(proc_name))
|
||||
.or_insert({
|
||||
let mut temp = VecDeque::new();
|
||||
temp.push_back(proc_name);
|
||||
temp
|
||||
});
|
||||
}
|
||||
// event add
|
||||
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
|
||||
}
|
||||
async fn check_state(&self) -> anyhow::Result<()> {
|
||||
let mut addrs = self.access_url.to_socket_addrs()?;
|
||||
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
|
||||
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn trigger_on(&mut self) {}
|
||||
async fn trigger_on(&'a mut self) {
|
||||
match self.state {
|
||||
ServiceState::Ok => {
|
||||
let _ = self.event_registrator
|
||||
.iter()
|
||||
.map(|(_, (_, el))| async {
|
||||
let _ = el.send(Events::Positive(&self.access_url)).await;
|
||||
});
|
||||
},
|
||||
ServiceState::Unavailable => {
|
||||
// looped check and notifying
|
||||
self.looped_check().await;
|
||||
},
|
||||
}
|
||||
}
|
||||
async fn looped_check(self: &'a mut Self) {
|
||||
let longest = self.config.last_entry().unwrap();
|
||||
let longest = longest.key();
|
||||
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
|
||||
let timer = tokio::time::Instant::now();
|
||||
let mut attempt: u32 = 1;
|
||||
let access_url = Arc::new(self.access_url.clone());
|
||||
// let event_registrator = &mut self.event_registrator;
|
||||
|
||||
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
|
||||
// let access_url = access_url.clone();
|
||||
loop {
|
||||
interapter.tick().await;
|
||||
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt);
|
||||
attempt += 1;
|
||||
|
||||
let state_check_result = self.check_state().await;
|
||||
|
||||
if state_check_result.is_ok() {
|
||||
info!("Connection to {} is `OK` now", &access_url);
|
||||
self.state = ServiceState::Ok;
|
||||
break;
|
||||
} else {
|
||||
let now = timer.elapsed();
|
||||
let iterator = self.config.iter()
|
||||
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
|
||||
.flat_map(|(_, a)| a.iter().copied())
|
||||
.collect::<VecDeque<&str>>();
|
||||
|
||||
for name in iterator {
|
||||
let sender_opt = self.event_registrator.get(name)
|
||||
.map(|(trigger, sender)|
|
||||
(trigger.to_service_negative_event(name), sender)
|
||||
);
|
||||
|
||||
if let Some((tr, tx)) = sender_opt {
|
||||
let _ = tx.send(tr.unwrap()).await;
|
||||
} else {
|
||||
error!("Cannot find {} channel sender in {} service", name, &self.access_url)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}).await {
|
||||
info!("Timeout of establishing connection to {}. ", &access_url);
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<'a> ProcessUnit<'a> for ServicesController<'a> {
|
||||
async fn process(&mut self) {
|
||||
async fn process(&'a mut self) {
|
||||
// check_service(hostname, port)
|
||||
let current_state = self.check_state().await;
|
||||
match (&self.state, current_state) {
|
||||
(ServiceState::Unavailable, Ok(_)) => {
|
||||
warn!("Unreachable for connection service `{}`. Notifying {} process(es)", &self.access_url, self.event_registrator.len());
|
||||
//
|
||||
self.state = ServiceState::Unavailable;
|
||||
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
||||
self.state = ServiceState::Ok;
|
||||
self.trigger_on().await;
|
||||
},
|
||||
(ServiceState::Ok, Err(_)) => {
|
||||
warn!("Connection with `{}` service was established. Notifying {} process(es)", &self.access_url, self.event_registrator.len());
|
||||
//
|
||||
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
||||
self.state = ServiceState::Unavailable;
|
||||
self.trigger_on().await;
|
||||
},
|
||||
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url),
|
||||
_ => { /* DEAD END WITH NO INTEREST */ },
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue