Compare commits

...

2 Commits

Author SHA1 Message Date
prplV 3c22a67052 OWNERSHIP FIX: services with ownership using Arc<str> 2025-05-04 08:59:55 -04:00
prplV 052448a7b9 OWNERSHIP FIX: structs with Arc<str> (instead of &'_ str) 2025-05-04 08:59:25 -04:00
2 changed files with 66 additions and 63 deletions

View File

@ -3,6 +3,7 @@
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use async_trait::async_trait; use async_trait::async_trait;
use std::sync::Arc;
pub enum DependencyType { pub enum DependencyType {
File, File,
@ -37,44 +38,44 @@ impl std::fmt::Display for FileTriggerType {
} }
impl<'a> FileTriggerType { impl<'a> FileTriggerType {
pub fn event(&self, file_name: &'a str, trigger: &'a str) -> Events<'a> { pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
return match self { return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)), FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)), FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
} }
} }
pub fn event_from_file_trigger_controller(&self, file_name: &'a str, trigger: &FileTriggersForController<'a>) -> Events<'a> { pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
return match self { return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change)), FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete)), FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete.clone())),
} }
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub enum Triggers<'a> { pub enum Triggers {
File{ on_change: &'a str, on_delete: &'a str }, File { on_change: Arc<str>, on_delete: Arc<str> },
Service{on_lost: &'a str, wait: u32}, Service {on_lost: Arc<str>, wait: u32},
} }
impl<'a> Triggers<'a> { impl Triggers {
pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> { pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers {
Triggers::File { on_change, on_delete } Triggers::File { on_change, on_delete }
} }
pub fn new_service(on_lost: &'a str, wait_time: u32) -> Triggers<'a> { pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers {
Triggers::Service{on_lost, wait: wait_time} Triggers::Service{on_lost, wait: wait_time}
} }
pub fn to_service_negative_event(&'a self, service_name: &'a str) -> Option<Events<'a>> { pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> {
if let Triggers::Service { on_lost, .. } = self { if let Triggers::Service { on_lost, .. } = self {
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, &on_lost))) return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone())))
} }
None None
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct FileTriggersForController<'a> { pub on_change: &'a str, pub on_delete: &'a str } pub struct FileTriggersForController{ pub on_change: Arc<str>, pub on_delete: Arc<str> }
pub struct ServiceTriggersForController<'a>(&'a str); pub struct ServiceTriggersForController(Arc<str>);
impl std::fmt::Display for DependencyType { impl std::fmt::Display for DependencyType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@ -92,19 +93,19 @@ pub enum ProcessState {
Stopped, Stopped,
StoppedByCli, StoppedByCli,
} }
pub enum Events<'a> { pub enum Events {
Positive(&'a str), Positive(Arc<str>),
Negative(NegativeOutcomes<'a>) Negative(NegativeOutcomes)
} }
pub enum NegativeOutcomes<'a> { pub enum NegativeOutcomes {
FileWasChanged(&'a str, DependencyType, &'a str), FileWasChanged(Arc<str>, DependencyType, Arc<str>),
FileWasMovedOrDeleted(&'a str, DependencyType, &'a str), FileWasMovedOrDeleted(Arc<str>, DependencyType, Arc<str>),
ServiceIsUnreachable(&'a str, DependencyType, &'a str), ServiceIsUnreachable(Arc<str>, DependencyType, Arc<str>),
} }
#[async_trait] #[async_trait]
pub trait ProcessUnit<'a> { pub trait ProcessUnit {
async fn process(&'a mut self); async fn process(&mut self);
} }
/// # an Error enum (next will be deleted and replaced) /// # an Error enum (next will be deleted and replaced)
pub enum CustomError { pub enum CustomError {

View File

@ -3,8 +3,7 @@ use super::prcs::{is_active, is_frozen};
use log::{error, warn}; use log::{error, warn};
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::time::Duration;
use tokio::time::{Duration, Instant};
use tokio::sync::mpsc::Sender as Sender; use tokio::sync::mpsc::Sender as Sender;
use async_trait::async_trait; use async_trait::async_trait;
@ -16,38 +15,38 @@ pub mod v2 {
use super::*; use super::*;
use std::collections::{HashMap, BTreeMap, VecDeque}; use std::collections::{HashMap, BTreeMap, VecDeque};
type MpscSender<'a> = Arc<Sender<Events<'a>>>; type MpscSender = Arc<Sender<Events>>;
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>; // type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>; type EventHandlers = HashMap<String, (Triggers, MpscSender)>;
// type wrapper for service wait queue // type wrapper for service wait queue
type ConnectionQueue<'a> = BTreeMap<u32, VecDeque<&'a str>>; type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
#[derive(Debug)] #[derive(Debug)]
pub struct ServicesController<'a> { pub struct ServicesController {
// i.e. yandex.ru // i.e. yandex.ru
#[allow(unused)] #[allow(unused)]
name : &'a str, name : String,
// i.e. yandex.ru:443 // i.e. yandex.ru:443
access_url : String, access_url : Arc<str>,
// "OK" or "Unavailable" // "OK" or "Unavailable"
state: ServiceState, state: ServiceState,
// btree map with key as max wait time and it's key to hashmap // btree map with key as max wait time and it's key to hashmap
config: ConnectionQueue<'a>, config: ConnectionQueue,
// Map of processes with their (trigger and mpsc sender) // Map of processes with their (trigger and mpsc sender)
event_registrator : EventHandlers<'a>, event_registrator : EventHandlers,
} }
impl<'a> PartialEq for ServicesController<'a> { impl PartialEq for ServicesController {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.access_url == other.access_url self.access_url == other.access_url
} }
} }
impl<'a> ServicesController<'a> { impl ServicesController {
pub fn new() -> ServicesController<'a> { pub fn new() -> ServicesController {
ServicesController { ServicesController {
name : "", name : String::new(),
access_url : String::new(), access_url : Arc::from(String::new()),
state : ServiceState::Unavailable, state : ServiceState::Unavailable,
config: ConnectionQueue::new(), config: ConnectionQueue::new(),
event_registrator : EventHandlers::new(), event_registrator : EventHandlers::new(),
@ -55,45 +54,46 @@ pub mod v2 {
} }
pub fn with_access_name( pub fn with_access_name(
mut self, mut self,
hostname: &'a str, hostname: &str,
access_url: String, access_url: &str,
) -> ServicesController<'a> { ) -> ServicesController {
self.name = hostname; self.name = hostname.to_string();
self.access_url = access_url; self.access_url = Arc::from(access_url);
self self
} }
pub fn with_params( pub fn with_params(
mut self, mut self,
conn_queue: ConnectionQueue<'a>, conn_queue: ConnectionQueue,
event_reg: EventHandlers<'a>, event_reg: EventHandlers,
) -> ServicesController<'a> { ) -> ServicesController {
self.config = conn_queue; self.config = conn_queue;
self.event_registrator = event_reg; self.event_registrator = event_reg;
self self
} }
pub fn get_access_url(hostname: &'a str, port: Option<&u32>) -> String { pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
} }
pub fn add_process( pub fn add_process(
&mut self, &mut self,
proc_name: &'a str, proc_name: &str,
trigger: Triggers<'a>, trigger: Triggers,
sender: MpscSender<'a>, sender: MpscSender,
) { ) {
let proc_name: Arc<str> = Arc::from(proc_name);
// queue add // queue add
if let Triggers::Service { wait, .. } = trigger { if let Triggers::Service { wait, .. } = trigger {
self.config.entry(wait) self.config.entry(wait)
.and_modify(|el| el.push_back(proc_name)) .and_modify(|el| el.push_back(proc_name.clone()))
.or_insert({ .or_insert({
let mut temp = VecDeque::new(); let mut temp = VecDeque::new();
temp.push_back(proc_name); temp.push_back(proc_name.clone());
temp temp
}); });
} }
// event add // event add
self.event_registrator.entry(proc_name).or_insert((trigger, sender)); self.event_registrator.entry(proc_name.to_string()).or_insert((trigger, sender));
} }
async fn check_state(&self) -> anyhow::Result<()> { async fn check_state(&self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?; let mut addrs = self.access_url.to_socket_addrs()?;
@ -102,13 +102,13 @@ pub mod v2 {
} }
Ok(()) Ok(())
} }
async fn trigger_on(&'a mut self) { async fn trigger_on(&mut self) {
match self.state { match self.state {
ServiceState::Ok => { ServiceState::Ok => {
let _ = self.event_registrator let _ = self.event_registrator
.iter() .iter()
.map(|(_, (_, el))| async { .map(|(_, (_, el))| async {
let _ = el.send(Events::Positive(&self.access_url)).await; let _ = el.send(Events::Positive(self.access_url.clone())).await;
}); });
}, },
ServiceState::Unavailable => { ServiceState::Unavailable => {
@ -117,7 +117,7 @@ pub mod v2 {
}, },
} }
} }
async fn looped_check(self: &'a mut Self) { async fn looped_check(self: &mut Self) {
let longest = self.config.last_entry().unwrap(); let longest = self.config.last_entry().unwrap();
let longest = longest.key(); let longest = longest.key();
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1)); let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
@ -143,19 +143,21 @@ pub mod v2 {
let now = timer.elapsed(); let now = timer.elapsed();
let iterator = self.config.iter() let iterator = self.config.iter()
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now) .filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
.flat_map(|(_, a)| a.iter().copied()) .flat_map(|(_, a)| a.iter().cloned())
.collect::<VecDeque<&str>>(); .collect::<VecDeque<Arc<str>>>();
for name in iterator { for name in iterator {
let sender_opt = self.event_registrator.get(name) let proc_name = name.to_string();
info!("Trying to notify process `{}` ...", &proc_name);
let sender_opt = self.event_registrator.get(&proc_name)
.map(|(trigger, sender)| .map(|(trigger, sender)|
(trigger.to_service_negative_event(name), sender) (trigger.to_service_negative_event(name.clone()), sender)
); );
if let Some((tr, tx)) = sender_opt { if let Some((tr, tx)) = sender_opt {
let _ = tx.send(tr.unwrap()).await; let _ = tx.send(tr.unwrap()).await;
} else { } else {
error!("Cannot find {} channel sender in {} service", name, &self.access_url) error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url)
} }
} }
} }
@ -166,8 +168,8 @@ pub mod v2 {
} }
} }
#[async_trait] #[async_trait]
impl<'a> ProcessUnit<'a> for ServicesController<'a> { impl ProcessUnit for ServicesController {
async fn process(&'a mut self) { async fn process(&mut self) {
// check_service(hostname, port) // check_service(hostname, port)
let current_state = self.check_state().await; let current_state = self.check_state().await;
match (&self.state, current_state) { match (&self.state, current_state) {