Compare commits

..

2 Commits

Author SHA1 Message Date
prplV 3c22a67052 OWNERSHIP FIX: services with ownership using Arc<str> 2025-05-04 08:59:55 -04:00
prplV 052448a7b9 OWNERSHIP FIX: structs with Arc<str> (instead of &'_ str) 2025-05-04 08:59:25 -04:00
2 changed files with 66 additions and 63 deletions

View File

@ -3,6 +3,7 @@
use std::net::Ipv4Addr;
use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use std::sync::Arc;
pub enum DependencyType {
File,
@ -37,44 +38,44 @@ impl std::fmt::Display for FileTriggerType {
}
impl<'a> FileTriggerType {
pub fn event(&self, file_name: &'a str, trigger: &'a str) -> Events<'a> {
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)),
}
}
pub fn event_from_file_trigger_controller(&self, file_name: &'a str, trigger: &FileTriggersForController<'a>) -> Events<'a> {
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events {
return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete)),
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete.clone())),
}
}
}
#[derive(Debug)]
pub enum Triggers<'a> {
File{ on_change: &'a str, on_delete: &'a str },
Service{on_lost: &'a str, wait: u32},
pub enum Triggers {
File { on_change: Arc<str>, on_delete: Arc<str> },
Service {on_lost: Arc<str>, wait: u32},
}
impl<'a> Triggers<'a> {
pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> {
impl Triggers {
pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers {
Triggers::File { on_change, on_delete }
}
pub fn new_service(on_lost: &'a str, wait_time: u32) -> Triggers<'a> {
pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers {
Triggers::Service{on_lost, wait: wait_time}
}
pub fn to_service_negative_event(&'a self, service_name: &'a str) -> Option<Events<'a>> {
pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> {
if let Triggers::Service { on_lost, .. } = self {
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, &on_lost)))
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone())))
}
None
}
}
#[derive(Debug)]
pub struct FileTriggersForController<'a> { pub on_change: &'a str, pub on_delete: &'a str }
pub struct ServiceTriggersForController<'a>(&'a str);
pub struct FileTriggersForController{ pub on_change: Arc<str>, pub on_delete: Arc<str> }
pub struct ServiceTriggersForController(Arc<str>);
impl std::fmt::Display for DependencyType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@ -92,19 +93,19 @@ pub enum ProcessState {
Stopped,
StoppedByCli,
}
pub enum Events<'a> {
Positive(&'a str),
Negative(NegativeOutcomes<'a>)
pub enum Events {
Positive(Arc<str>),
Negative(NegativeOutcomes)
}
pub enum NegativeOutcomes<'a> {
FileWasChanged(&'a str, DependencyType, &'a str),
FileWasMovedOrDeleted(&'a str, DependencyType, &'a str),
ServiceIsUnreachable(&'a str, DependencyType, &'a str),
pub enum NegativeOutcomes {
FileWasChanged(Arc<str>, DependencyType, Arc<str>),
FileWasMovedOrDeleted(Arc<str>, DependencyType, Arc<str>),
ServiceIsUnreachable(Arc<str>, DependencyType, Arc<str>),
}
#[async_trait]
pub trait ProcessUnit<'a> {
async fn process(&'a mut self);
pub trait ProcessUnit {
async fn process(&mut self);
}
/// # an Error enum (next will be deleted and replaced)
pub enum CustomError {

View File

@ -3,8 +3,7 @@ use super::prcs::{is_active, is_frozen};
use log::{error, warn};
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
use tokio::time::Duration;
use tokio::sync::mpsc::Sender as Sender;
use async_trait::async_trait;
@ -16,38 +15,38 @@ pub mod v2 {
use super::*;
use std::collections::{HashMap, BTreeMap, VecDeque};
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
type MpscSender = Arc<Sender<Events>>;
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
type EventHandlers = HashMap<String, (Triggers, MpscSender)>;
// type wrapper for service wait queue
type ConnectionQueue<'a> = BTreeMap<u32, VecDeque<&'a str>>;
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
#[derive(Debug)]
pub struct ServicesController<'a> {
pub struct ServicesController {
// i.e. yandex.ru
#[allow(unused)]
name : &'a str,
name : String,
// i.e. yandex.ru:443
access_url : String,
access_url : Arc<str>,
// "OK" or "Unavailable"
state: ServiceState,
// btree map with key as max wait time and it's key to hashmap
config: ConnectionQueue<'a>,
config: ConnectionQueue,
// Map of processes with their (trigger and mpsc sender)
event_registrator : EventHandlers<'a>,
event_registrator : EventHandlers,
}
impl<'a> PartialEq for ServicesController<'a> {
impl PartialEq for ServicesController {
fn eq(&self, other: &Self) -> bool {
self.access_url == other.access_url
}
}
impl<'a> ServicesController<'a> {
pub fn new() -> ServicesController<'a> {
impl ServicesController {
pub fn new() -> ServicesController {
ServicesController {
name : "",
access_url : String::new(),
name : String::new(),
access_url : Arc::from(String::new()),
state : ServiceState::Unavailable,
config: ConnectionQueue::new(),
event_registrator : EventHandlers::new(),
@ -55,45 +54,46 @@ pub mod v2 {
}
pub fn with_access_name(
mut self,
hostname: &'a str,
access_url: String,
) -> ServicesController<'a> {
self.name = hostname;
self.access_url = access_url;
hostname: &str,
access_url: &str,
) -> ServicesController {
self.name = hostname.to_string();
self.access_url = Arc::from(access_url);
self
}
pub fn with_params(
mut self,
conn_queue: ConnectionQueue<'a>,
event_reg: EventHandlers<'a>,
) -> ServicesController<'a> {
conn_queue: ConnectionQueue,
event_reg: EventHandlers,
) -> ServicesController {
self.config = conn_queue;
self.event_registrator = event_reg;
self
}
pub fn get_access_url(hostname: &'a str, port: Option<&u32>) -> String {
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
}
pub fn add_process(
&mut self,
proc_name: &'a str,
trigger: Triggers<'a>,
sender: MpscSender<'a>,
proc_name: &str,
trigger: Triggers,
sender: MpscSender,
) {
let proc_name: Arc<str> = Arc::from(proc_name);
// queue add
if let Triggers::Service { wait, .. } = trigger {
self.config.entry(wait)
.and_modify(|el| el.push_back(proc_name))
.and_modify(|el| el.push_back(proc_name.clone()))
.or_insert({
let mut temp = VecDeque::new();
temp.push_back(proc_name);
temp.push_back(proc_name.clone());
temp
});
}
// event add
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
self.event_registrator.entry(proc_name.to_string()).or_insert((trigger, sender));
}
async fn check_state(&self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?;
@ -102,13 +102,13 @@ pub mod v2 {
}
Ok(())
}
async fn trigger_on(&'a mut self) {
async fn trigger_on(&mut self) {
match self.state {
ServiceState::Ok => {
let _ = self.event_registrator
.iter()
.map(|(_, (_, el))| async {
let _ = el.send(Events::Positive(&self.access_url)).await;
let _ = el.send(Events::Positive(self.access_url.clone())).await;
});
},
ServiceState::Unavailable => {
@ -117,7 +117,7 @@ pub mod v2 {
},
}
}
async fn looped_check(self: &'a mut Self) {
async fn looped_check(self: &mut Self) {
let longest = self.config.last_entry().unwrap();
let longest = longest.key();
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
@ -143,19 +143,21 @@ pub mod v2 {
let now = timer.elapsed();
let iterator = self.config.iter()
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
.flat_map(|(_, a)| a.iter().copied())
.collect::<VecDeque<&str>>();
.flat_map(|(_, a)| a.iter().cloned())
.collect::<VecDeque<Arc<str>>>();
for name in iterator {
let sender_opt = self.event_registrator.get(name)
let proc_name = name.to_string();
info!("Trying to notify process `{}` ...", &proc_name);
let sender_opt = self.event_registrator.get(&proc_name)
.map(|(trigger, sender)|
(trigger.to_service_negative_event(name), sender)
(trigger.to_service_negative_event(name.clone()), sender)
);
if let Some((tr, tx)) = sender_opt {
let _ = tx.send(tr.unwrap()).await;
} else {
error!("Cannot find {} channel sender in {} service", name, &self.access_url)
error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url)
}
}
}
@ -166,8 +168,8 @@ pub mod v2 {
}
}
#[async_trait]
impl<'a> ProcessUnit<'a> for ServicesController<'a> {
async fn process(&'a mut self) {
impl ProcessUnit for ServicesController {
async fn process(&mut self) {
// check_service(hostname, port)
let current_state = self.check_state().await;
match (&self.state, current_state) {