From 3c22a67052507031207b5b3b57fcf0138c4188bf Mon Sep 17 00:00:00 2001 From: prplV Date: Sun, 4 May 2025 08:59:55 -0400 Subject: [PATCH] OWNERSHIP FIX: services with ownership using Arc --- noxis-rs/src/utils/services.rs | 82 +++++++++++++++++----------------- 1 file changed, 42 insertions(+), 40 deletions(-) diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index 9cec823..0b048e5 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -3,8 +3,7 @@ use super::prcs::{is_active, is_frozen}; use log::{error, warn}; use std::net::{TcpStream, ToSocketAddrs}; use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::time::{Duration, Instant}; +use tokio::time::Duration; use tokio::sync::mpsc::Sender as Sender; use async_trait::async_trait; @@ -16,38 +15,38 @@ pub mod v2 { use super::*; use std::collections::{HashMap, BTreeMap, VecDeque}; - type MpscSender<'a> = Arc>>; + type MpscSender = Arc>; // type EventHandlers<'a> = Vec>>; - type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>; + type EventHandlers = HashMap; // type wrapper for service wait queue - type ConnectionQueue<'a> = BTreeMap>; + type ConnectionQueue = BTreeMap>>; #[derive(Debug)] - pub struct ServicesController<'a> { + pub struct ServicesController { // i.e. yandex.ru #[allow(unused)] - name : &'a str, + name : String, // i.e. yandex.ru:443 - access_url : String, + access_url : Arc, // "OK" or "Unavailable" state: ServiceState, // btree map with key as max wait time and it's key to hashmap - config: ConnectionQueue<'a>, + config: ConnectionQueue, // Map of processes with their (trigger and mpsc sender) - event_registrator : EventHandlers<'a>, + event_registrator : EventHandlers, } - impl<'a> PartialEq for ServicesController<'a> { + impl PartialEq for ServicesController { fn eq(&self, other: &Self) -> bool { self.access_url == other.access_url } } - impl<'a> ServicesController<'a> { - pub fn new() -> ServicesController<'a> { + impl ServicesController { + pub fn new() -> ServicesController { ServicesController { - name : "", - access_url : String::new(), + name : String::new(), + access_url : Arc::from(String::new()), state : ServiceState::Unavailable, config: ConnectionQueue::new(), event_registrator : EventHandlers::new(), @@ -55,45 +54,46 @@ pub mod v2 { } pub fn with_access_name( mut self, - hostname: &'a str, - access_url: String, - ) -> ServicesController<'a> { - self.name = hostname; - self.access_url = access_url; + hostname: &str, + access_url: &str, + ) -> ServicesController { + self.name = hostname.to_string(); + self.access_url = Arc::from(access_url); self } pub fn with_params( mut self, - conn_queue: ConnectionQueue<'a>, - event_reg: EventHandlers<'a>, - ) -> ServicesController<'a> { + conn_queue: ConnectionQueue, + event_reg: EventHandlers, + ) -> ServicesController { self.config = conn_queue; self.event_registrator = event_reg; self } - pub fn get_access_url(hostname: &'a str, port: Option<&u32>) -> String { + pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String { format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) } pub fn add_process( &mut self, - proc_name: &'a str, - trigger: Triggers<'a>, - sender: MpscSender<'a>, + proc_name: &str, + trigger: Triggers, + sender: MpscSender, ) { + let proc_name: Arc = Arc::from(proc_name); // queue add if let Triggers::Service { wait, .. } = trigger { self.config.entry(wait) - .and_modify(|el| el.push_back(proc_name)) + .and_modify(|el| el.push_back(proc_name.clone())) .or_insert({ let mut temp = VecDeque::new(); - temp.push_back(proc_name); + temp.push_back(proc_name.clone()); temp }); } // event add - self.event_registrator.entry(proc_name).or_insert((trigger, sender)); + self.event_registrator.entry(proc_name.to_string()).or_insert((trigger, sender)); } async fn check_state(&self) -> anyhow::Result<()> { let mut addrs = self.access_url.to_socket_addrs()?; @@ -102,13 +102,13 @@ pub mod v2 { } Ok(()) } - async fn trigger_on(&'a mut self) { + async fn trigger_on(&mut self) { match self.state { ServiceState::Ok => { let _ = self.event_registrator .iter() .map(|(_, (_, el))| async { - let _ = el.send(Events::Positive(&self.access_url)).await; + let _ = el.send(Events::Positive(self.access_url.clone())).await; }); }, ServiceState::Unavailable => { @@ -117,7 +117,7 @@ pub mod v2 { }, } } - async fn looped_check(self: &'a mut Self) { + async fn looped_check(self: &mut Self) { let longest = self.config.last_entry().unwrap(); let longest = longest.key(); let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1)); @@ -143,19 +143,21 @@ pub mod v2 { let now = timer.elapsed(); let iterator = self.config.iter() .filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now) - .flat_map(|(_, a)| a.iter().copied()) - .collect::>(); + .flat_map(|(_, a)| a.iter().cloned()) + .collect::>>(); for name in iterator { - let sender_opt = self.event_registrator.get(name) + let proc_name = name.to_string(); + info!("Trying to notify process `{}` ...", &proc_name); + let sender_opt = self.event_registrator.get(&proc_name) .map(|(trigger, sender)| - (trigger.to_service_negative_event(name), sender) + (trigger.to_service_negative_event(name.clone()), sender) ); if let Some((tr, tx)) = sender_opt { let _ = tx.send(tr.unwrap()).await; } else { - error!("Cannot find {} channel sender in {} service", name, &self.access_url) + error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url) } } } @@ -166,8 +168,8 @@ pub mod v2 { } } #[async_trait] - impl<'a> ProcessUnit<'a> for ServicesController<'a> { - async fn process(&'a mut self) { + impl ProcessUnit for ServicesController { + async fn process(&mut self) { // check_service(hostname, port) let current_state = self.check_state().await; match (&self.state, current_state) {