Compare commits

..

No commits in common. "3c22a67052507031207b5b3b57fcf0138c4188bf" and "6d56d1e39c38cc9ca17433d68420a680210fe313" have entirely different histories.

2 changed files with 63 additions and 66 deletions

View File

@ -3,7 +3,6 @@
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use async_trait::async_trait; use async_trait::async_trait;
use std::sync::Arc;
pub enum DependencyType { pub enum DependencyType {
File, File,
@ -38,44 +37,44 @@ impl std::fmt::Display for FileTriggerType {
} }
impl<'a> FileTriggerType { impl<'a> FileTriggerType {
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events { pub fn event(&self, file_name: &'a str, trigger: &'a str) -> Events<'a> {
return match self { return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)), FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)), FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
} }
} }
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events { pub fn event_from_file_trigger_controller(&self, file_name: &'a str, trigger: &FileTriggersForController<'a>) -> Events<'a> {
return match self { return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())), FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete.clone())), FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete)),
} }
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub enum Triggers { pub enum Triggers<'a> {
File { on_change: Arc<str>, on_delete: Arc<str> }, File{ on_change: &'a str, on_delete: &'a str },
Service {on_lost: Arc<str>, wait: u32}, Service{on_lost: &'a str, wait: u32},
} }
impl Triggers { impl<'a> Triggers<'a> {
pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers { pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> {
Triggers::File { on_change, on_delete } Triggers::File { on_change, on_delete }
} }
pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers { pub fn new_service(on_lost: &'a str, wait_time: u32) -> Triggers<'a> {
Triggers::Service{on_lost, wait: wait_time} Triggers::Service{on_lost, wait: wait_time}
} }
pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> { pub fn to_service_negative_event(&'a self, service_name: &'a str) -> Option<Events<'a>> {
if let Triggers::Service { on_lost, .. } = self { if let Triggers::Service { on_lost, .. } = self {
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone()))) return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, &on_lost)))
} }
None None
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct FileTriggersForController{ pub on_change: Arc<str>, pub on_delete: Arc<str> } pub struct FileTriggersForController<'a> { pub on_change: &'a str, pub on_delete: &'a str }
pub struct ServiceTriggersForController(Arc<str>); pub struct ServiceTriggersForController<'a>(&'a str);
impl std::fmt::Display for DependencyType { impl std::fmt::Display for DependencyType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@ -93,19 +92,19 @@ pub enum ProcessState {
Stopped, Stopped,
StoppedByCli, StoppedByCli,
} }
pub enum Events { pub enum Events<'a> {
Positive(Arc<str>), Positive(&'a str),
Negative(NegativeOutcomes) Negative(NegativeOutcomes<'a>)
} }
pub enum NegativeOutcomes { pub enum NegativeOutcomes<'a> {
FileWasChanged(Arc<str>, DependencyType, Arc<str>), FileWasChanged(&'a str, DependencyType, &'a str),
FileWasMovedOrDeleted(Arc<str>, DependencyType, Arc<str>), FileWasMovedOrDeleted(&'a str, DependencyType, &'a str),
ServiceIsUnreachable(Arc<str>, DependencyType, Arc<str>), ServiceIsUnreachable(&'a str, DependencyType, &'a str),
} }
#[async_trait] #[async_trait]
pub trait ProcessUnit { pub trait ProcessUnit<'a> {
async fn process(&mut self); async fn process(&'a mut self);
} }
/// # an Error enum (next will be deleted and replaced) /// # an Error enum (next will be deleted and replaced)
pub enum CustomError { pub enum CustomError {

View File

@ -3,7 +3,8 @@ use super::prcs::{is_active, is_frozen};
use log::{error, warn}; use log::{error, warn};
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc; use std::sync::Arc;
use tokio::time::Duration; use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
use tokio::sync::mpsc::Sender as Sender; use tokio::sync::mpsc::Sender as Sender;
use async_trait::async_trait; use async_trait::async_trait;
@ -15,38 +16,38 @@ pub mod v2 {
use super::*; use super::*;
use std::collections::{HashMap, BTreeMap, VecDeque}; use std::collections::{HashMap, BTreeMap, VecDeque};
type MpscSender = Arc<Sender<Events>>; type MpscSender<'a> = Arc<Sender<Events<'a>>>;
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>; // type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers = HashMap<String, (Triggers, MpscSender)>; type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
// type wrapper for service wait queue // type wrapper for service wait queue
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>; type ConnectionQueue<'a> = BTreeMap<u32, VecDeque<&'a str>>;
#[derive(Debug)] #[derive(Debug)]
pub struct ServicesController { pub struct ServicesController<'a> {
// i.e. yandex.ru // i.e. yandex.ru
#[allow(unused)] #[allow(unused)]
name : String, name : &'a str,
// i.e. yandex.ru:443 // i.e. yandex.ru:443
access_url : Arc<str>, access_url : String,
// "OK" or "Unavailable" // "OK" or "Unavailable"
state: ServiceState, state: ServiceState,
// btree map with key as max wait time and it's key to hashmap // btree map with key as max wait time and it's key to hashmap
config: ConnectionQueue, config: ConnectionQueue<'a>,
// Map of processes with their (trigger and mpsc sender) // Map of processes with their (trigger and mpsc sender)
event_registrator : EventHandlers, event_registrator : EventHandlers<'a>,
} }
impl PartialEq for ServicesController { impl<'a> PartialEq for ServicesController<'a> {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.access_url == other.access_url self.access_url == other.access_url
} }
} }
impl ServicesController { impl<'a> ServicesController<'a> {
pub fn new() -> ServicesController { pub fn new() -> ServicesController<'a> {
ServicesController { ServicesController {
name : String::new(), name : "",
access_url : Arc::from(String::new()), access_url : String::new(),
state : ServiceState::Unavailable, state : ServiceState::Unavailable,
config: ConnectionQueue::new(), config: ConnectionQueue::new(),
event_registrator : EventHandlers::new(), event_registrator : EventHandlers::new(),
@ -54,46 +55,45 @@ pub mod v2 {
} }
pub fn with_access_name( pub fn with_access_name(
mut self, mut self,
hostname: &str, hostname: &'a str,
access_url: &str, access_url: String,
) -> ServicesController { ) -> ServicesController<'a> {
self.name = hostname.to_string(); self.name = hostname;
self.access_url = Arc::from(access_url); self.access_url = access_url;
self self
} }
pub fn with_params( pub fn with_params(
mut self, mut self,
conn_queue: ConnectionQueue, conn_queue: ConnectionQueue<'a>,
event_reg: EventHandlers, event_reg: EventHandlers<'a>,
) -> ServicesController { ) -> ServicesController<'a> {
self.config = conn_queue; self.config = conn_queue;
self.event_registrator = event_reg; self.event_registrator = event_reg;
self self
} }
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String { pub fn get_access_url(hostname: &'a str, port: Option<&u32>) -> String {
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
} }
pub fn add_process( pub fn add_process(
&mut self, &mut self,
proc_name: &str, proc_name: &'a str,
trigger: Triggers, trigger: Triggers<'a>,
sender: MpscSender, sender: MpscSender<'a>,
) { ) {
let proc_name: Arc<str> = Arc::from(proc_name);
// queue add // queue add
if let Triggers::Service { wait, .. } = trigger { if let Triggers::Service { wait, .. } = trigger {
self.config.entry(wait) self.config.entry(wait)
.and_modify(|el| el.push_back(proc_name.clone())) .and_modify(|el| el.push_back(proc_name))
.or_insert({ .or_insert({
let mut temp = VecDeque::new(); let mut temp = VecDeque::new();
temp.push_back(proc_name.clone()); temp.push_back(proc_name);
temp temp
}); });
} }
// event add // event add
self.event_registrator.entry(proc_name.to_string()).or_insert((trigger, sender)); self.event_registrator.entry(proc_name).or_insert((trigger, sender));
} }
async fn check_state(&self) -> anyhow::Result<()> { async fn check_state(&self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?; let mut addrs = self.access_url.to_socket_addrs()?;
@ -102,13 +102,13 @@ pub mod v2 {
} }
Ok(()) Ok(())
} }
async fn trigger_on(&mut self) { async fn trigger_on(&'a mut self) {
match self.state { match self.state {
ServiceState::Ok => { ServiceState::Ok => {
let _ = self.event_registrator let _ = self.event_registrator
.iter() .iter()
.map(|(_, (_, el))| async { .map(|(_, (_, el))| async {
let _ = el.send(Events::Positive(self.access_url.clone())).await; let _ = el.send(Events::Positive(&self.access_url)).await;
}); });
}, },
ServiceState::Unavailable => { ServiceState::Unavailable => {
@ -117,7 +117,7 @@ pub mod v2 {
}, },
} }
} }
async fn looped_check(self: &mut Self) { async fn looped_check(self: &'a mut Self) {
let longest = self.config.last_entry().unwrap(); let longest = self.config.last_entry().unwrap();
let longest = longest.key(); let longest = longest.key();
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1)); let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
@ -143,21 +143,19 @@ pub mod v2 {
let now = timer.elapsed(); let now = timer.elapsed();
let iterator = self.config.iter() let iterator = self.config.iter()
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now) .filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
.flat_map(|(_, a)| a.iter().cloned()) .flat_map(|(_, a)| a.iter().copied())
.collect::<VecDeque<Arc<str>>>(); .collect::<VecDeque<&str>>();
for name in iterator { for name in iterator {
let proc_name = name.to_string(); let sender_opt = self.event_registrator.get(name)
info!("Trying to notify process `{}` ...", &proc_name);
let sender_opt = self.event_registrator.get(&proc_name)
.map(|(trigger, sender)| .map(|(trigger, sender)|
(trigger.to_service_negative_event(name.clone()), sender) (trigger.to_service_negative_event(name), sender)
); );
if let Some((tr, tx)) = sender_opt { if let Some((tr, tx)) = sender_opt {
let _ = tx.send(tr.unwrap()).await; let _ = tx.send(tr.unwrap()).await;
} else { } else {
error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url) error!("Cannot find {} channel sender in {} service", name, &self.access_url)
} }
} }
} }
@ -168,8 +166,8 @@ pub mod v2 {
} }
} }
#[async_trait] #[async_trait]
impl ProcessUnit for ServicesController { impl<'a> ProcessUnit<'a> for ServicesController<'a> {
async fn process(&mut self) { async fn process(&'a mut self) {
// check_service(hostname, port) // check_service(hostname, port)
let current_state = self.check_state().await; let current_state = self.check_state().await;
match (&self.state, current_state) { match (&self.state, current_state) {