Compare commits
2 Commits
6d56d1e39c
...
3c22a67052
| Author | SHA1 | Date |
|---|---|---|
|
|
3c22a67052 | |
|
|
052448a7b9 |
|
|
@ -3,6 +3,7 @@
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub enum DependencyType {
|
pub enum DependencyType {
|
||||||
File,
|
File,
|
||||||
|
|
@ -37,44 +38,44 @@ impl std::fmt::Display for FileTriggerType {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> FileTriggerType {
|
impl<'a> FileTriggerType {
|
||||||
pub fn event(&self, file_name: &'a str, trigger: &'a str) -> Events<'a> {
|
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
|
||||||
return match self {
|
return match self {
|
||||||
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
|
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
|
||||||
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
|
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn event_from_file_trigger_controller(&self, file_name: &'a str, trigger: &FileTriggersForController<'a>) -> Events<'a> {
|
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
|
||||||
return match self {
|
return match self {
|
||||||
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change)),
|
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
|
||||||
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete)),
|
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete.clone())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Triggers<'a> {
|
pub enum Triggers {
|
||||||
File{ on_change: &'a str, on_delete: &'a str },
|
File { on_change: Arc<str>, on_delete: Arc<str> },
|
||||||
Service{on_lost: &'a str, wait: u32},
|
Service {on_lost: Arc<str>, wait: u32},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Triggers<'a> {
|
impl Triggers {
|
||||||
pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> {
|
pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers {
|
||||||
Triggers::File { on_change, on_delete }
|
Triggers::File { on_change, on_delete }
|
||||||
}
|
}
|
||||||
pub fn new_service(on_lost: &'a str, wait_time: u32) -> Triggers<'a> {
|
pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers {
|
||||||
Triggers::Service{on_lost, wait: wait_time}
|
Triggers::Service{on_lost, wait: wait_time}
|
||||||
}
|
}
|
||||||
pub fn to_service_negative_event(&'a self, service_name: &'a str) -> Option<Events<'a>> {
|
pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> {
|
||||||
if let Triggers::Service { on_lost, .. } = self {
|
if let Triggers::Service { on_lost, .. } = self {
|
||||||
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, &on_lost)))
|
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone())))
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct FileTriggersForController<'a> { pub on_change: &'a str, pub on_delete: &'a str }
|
pub struct FileTriggersForController{ pub on_change: Arc<str>, pub on_delete: Arc<str> }
|
||||||
pub struct ServiceTriggersForController<'a>(&'a str);
|
pub struct ServiceTriggersForController(Arc<str>);
|
||||||
|
|
||||||
impl std::fmt::Display for DependencyType {
|
impl std::fmt::Display for DependencyType {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
|
@ -92,19 +93,19 @@ pub enum ProcessState {
|
||||||
Stopped,
|
Stopped,
|
||||||
StoppedByCli,
|
StoppedByCli,
|
||||||
}
|
}
|
||||||
pub enum Events<'a> {
|
pub enum Events {
|
||||||
Positive(&'a str),
|
Positive(Arc<str>),
|
||||||
Negative(NegativeOutcomes<'a>)
|
Negative(NegativeOutcomes)
|
||||||
}
|
}
|
||||||
pub enum NegativeOutcomes<'a> {
|
pub enum NegativeOutcomes {
|
||||||
FileWasChanged(&'a str, DependencyType, &'a str),
|
FileWasChanged(Arc<str>, DependencyType, Arc<str>),
|
||||||
FileWasMovedOrDeleted(&'a str, DependencyType, &'a str),
|
FileWasMovedOrDeleted(Arc<str>, DependencyType, Arc<str>),
|
||||||
ServiceIsUnreachable(&'a str, DependencyType, &'a str),
|
ServiceIsUnreachable(Arc<str>, DependencyType, Arc<str>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait ProcessUnit<'a> {
|
pub trait ProcessUnit {
|
||||||
async fn process(&'a mut self);
|
async fn process(&mut self);
|
||||||
}
|
}
|
||||||
/// # an Error enum (next will be deleted and replaced)
|
/// # an Error enum (next will be deleted and replaced)
|
||||||
pub enum CustomError {
|
pub enum CustomError {
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,7 @@ use super::prcs::{is_active, is_frozen};
|
||||||
use log::{error, warn};
|
use log::{error, warn};
|
||||||
use std::net::{TcpStream, ToSocketAddrs};
|
use std::net::{TcpStream, ToSocketAddrs};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::time::Duration;
|
||||||
use tokio::time::{Duration, Instant};
|
|
||||||
use tokio::sync::mpsc::Sender as Sender;
|
use tokio::sync::mpsc::Sender as Sender;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
|
@ -16,38 +15,38 @@ pub mod v2 {
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::collections::{HashMap, BTreeMap, VecDeque};
|
use std::collections::{HashMap, BTreeMap, VecDeque};
|
||||||
|
|
||||||
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
|
type MpscSender = Arc<Sender<Events>>;
|
||||||
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
|
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
|
||||||
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
|
type EventHandlers = HashMap<String, (Triggers, MpscSender)>;
|
||||||
// type wrapper for service wait queue
|
// type wrapper for service wait queue
|
||||||
type ConnectionQueue<'a> = BTreeMap<u32, VecDeque<&'a str>>;
|
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ServicesController<'a> {
|
pub struct ServicesController {
|
||||||
// i.e. yandex.ru
|
// i.e. yandex.ru
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
name : &'a str,
|
name : String,
|
||||||
// i.e. yandex.ru:443
|
// i.e. yandex.ru:443
|
||||||
access_url : String,
|
access_url : Arc<str>,
|
||||||
// "OK" or "Unavailable"
|
// "OK" or "Unavailable"
|
||||||
state: ServiceState,
|
state: ServiceState,
|
||||||
// btree map with key as max wait time and it's key to hashmap
|
// btree map with key as max wait time and it's key to hashmap
|
||||||
config: ConnectionQueue<'a>,
|
config: ConnectionQueue,
|
||||||
// Map of processes with their (trigger and mpsc sender)
|
// Map of processes with their (trigger and mpsc sender)
|
||||||
event_registrator : EventHandlers<'a>,
|
event_registrator : EventHandlers,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> PartialEq for ServicesController<'a> {
|
impl PartialEq for ServicesController {
|
||||||
fn eq(&self, other: &Self) -> bool {
|
fn eq(&self, other: &Self) -> bool {
|
||||||
self.access_url == other.access_url
|
self.access_url == other.access_url
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> ServicesController<'a> {
|
impl ServicesController {
|
||||||
pub fn new() -> ServicesController<'a> {
|
pub fn new() -> ServicesController {
|
||||||
ServicesController {
|
ServicesController {
|
||||||
name : "",
|
name : String::new(),
|
||||||
access_url : String::new(),
|
access_url : Arc::from(String::new()),
|
||||||
state : ServiceState::Unavailable,
|
state : ServiceState::Unavailable,
|
||||||
config: ConnectionQueue::new(),
|
config: ConnectionQueue::new(),
|
||||||
event_registrator : EventHandlers::new(),
|
event_registrator : EventHandlers::new(),
|
||||||
|
|
@ -55,45 +54,46 @@ pub mod v2 {
|
||||||
}
|
}
|
||||||
pub fn with_access_name(
|
pub fn with_access_name(
|
||||||
mut self,
|
mut self,
|
||||||
hostname: &'a str,
|
hostname: &str,
|
||||||
access_url: String,
|
access_url: &str,
|
||||||
) -> ServicesController<'a> {
|
) -> ServicesController {
|
||||||
self.name = hostname;
|
self.name = hostname.to_string();
|
||||||
self.access_url = access_url;
|
self.access_url = Arc::from(access_url);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_params(
|
pub fn with_params(
|
||||||
mut self,
|
mut self,
|
||||||
conn_queue: ConnectionQueue<'a>,
|
conn_queue: ConnectionQueue,
|
||||||
event_reg: EventHandlers<'a>,
|
event_reg: EventHandlers,
|
||||||
) -> ServicesController<'a> {
|
) -> ServicesController {
|
||||||
self.config = conn_queue;
|
self.config = conn_queue;
|
||||||
self.event_registrator = event_reg;
|
self.event_registrator = event_reg;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_access_url(hostname: &'a str, port: Option<&u32>) -> String {
|
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
|
||||||
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
|
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
|
||||||
}
|
}
|
||||||
pub fn add_process(
|
pub fn add_process(
|
||||||
&mut self,
|
&mut self,
|
||||||
proc_name: &'a str,
|
proc_name: &str,
|
||||||
trigger: Triggers<'a>,
|
trigger: Triggers,
|
||||||
sender: MpscSender<'a>,
|
sender: MpscSender,
|
||||||
) {
|
) {
|
||||||
|
let proc_name: Arc<str> = Arc::from(proc_name);
|
||||||
// queue add
|
// queue add
|
||||||
if let Triggers::Service { wait, .. } = trigger {
|
if let Triggers::Service { wait, .. } = trigger {
|
||||||
self.config.entry(wait)
|
self.config.entry(wait)
|
||||||
.and_modify(|el| el.push_back(proc_name))
|
.and_modify(|el| el.push_back(proc_name.clone()))
|
||||||
.or_insert({
|
.or_insert({
|
||||||
let mut temp = VecDeque::new();
|
let mut temp = VecDeque::new();
|
||||||
temp.push_back(proc_name);
|
temp.push_back(proc_name.clone());
|
||||||
temp
|
temp
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// event add
|
// event add
|
||||||
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
|
self.event_registrator.entry(proc_name.to_string()).or_insert((trigger, sender));
|
||||||
}
|
}
|
||||||
async fn check_state(&self) -> anyhow::Result<()> {
|
async fn check_state(&self) -> anyhow::Result<()> {
|
||||||
let mut addrs = self.access_url.to_socket_addrs()?;
|
let mut addrs = self.access_url.to_socket_addrs()?;
|
||||||
|
|
@ -102,13 +102,13 @@ pub mod v2 {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
async fn trigger_on(&'a mut self) {
|
async fn trigger_on(&mut self) {
|
||||||
match self.state {
|
match self.state {
|
||||||
ServiceState::Ok => {
|
ServiceState::Ok => {
|
||||||
let _ = self.event_registrator
|
let _ = self.event_registrator
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, (_, el))| async {
|
.map(|(_, (_, el))| async {
|
||||||
let _ = el.send(Events::Positive(&self.access_url)).await;
|
let _ = el.send(Events::Positive(self.access_url.clone())).await;
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
ServiceState::Unavailable => {
|
ServiceState::Unavailable => {
|
||||||
|
|
@ -117,7 +117,7 @@ pub mod v2 {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn looped_check(self: &'a mut Self) {
|
async fn looped_check(self: &mut Self) {
|
||||||
let longest = self.config.last_entry().unwrap();
|
let longest = self.config.last_entry().unwrap();
|
||||||
let longest = longest.key();
|
let longest = longest.key();
|
||||||
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
|
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
|
||||||
|
|
@ -143,19 +143,21 @@ pub mod v2 {
|
||||||
let now = timer.elapsed();
|
let now = timer.elapsed();
|
||||||
let iterator = self.config.iter()
|
let iterator = self.config.iter()
|
||||||
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
|
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
|
||||||
.flat_map(|(_, a)| a.iter().copied())
|
.flat_map(|(_, a)| a.iter().cloned())
|
||||||
.collect::<VecDeque<&str>>();
|
.collect::<VecDeque<Arc<str>>>();
|
||||||
|
|
||||||
for name in iterator {
|
for name in iterator {
|
||||||
let sender_opt = self.event_registrator.get(name)
|
let proc_name = name.to_string();
|
||||||
|
info!("Trying to notify process `{}` ...", &proc_name);
|
||||||
|
let sender_opt = self.event_registrator.get(&proc_name)
|
||||||
.map(|(trigger, sender)|
|
.map(|(trigger, sender)|
|
||||||
(trigger.to_service_negative_event(name), sender)
|
(trigger.to_service_negative_event(name.clone()), sender)
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some((tr, tx)) = sender_opt {
|
if let Some((tr, tx)) = sender_opt {
|
||||||
let _ = tx.send(tr.unwrap()).await;
|
let _ = tx.send(tr.unwrap()).await;
|
||||||
} else {
|
} else {
|
||||||
error!("Cannot find {} channel sender in {} service", name, &self.access_url)
|
error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -166,8 +168,8 @@ pub mod v2 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<'a> ProcessUnit<'a> for ServicesController<'a> {
|
impl ProcessUnit for ServicesController {
|
||||||
async fn process(&'a mut self) {
|
async fn process(&mut self) {
|
||||||
// check_service(hostname, port)
|
// check_service(hostname, port)
|
||||||
let current_state = self.check_state().await;
|
let current_state = self.check_state().await;
|
||||||
match (&self.state, current_state) {
|
match (&self.state, current_state) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue