Compare commits

...

2 Commits

Author SHA1 Message Date
prplV 0d68efd461 service controller 2025-04-18 08:44:35 -04:00
prplV 4fc90300fc file fix 2025-04-18 08:44:16 -04:00
3 changed files with 40 additions and 11 deletions

View File

@ -8,11 +8,15 @@ pub enum DependencyType {
Service, Service,
} }
pub struct ServiceWaitConfig { wait: u32, delay: u32} pub enum ServiceState {
Ok,
Unavailable
}
pub struct ServiceWaitConfig(u32);
impl Default for ServiceWaitConfig { impl Default for ServiceWaitConfig {
fn default() -> Self { fn default() -> Self {
Self { wait: 0, delay: 5 } Self(5)
} }
} }

View File

@ -19,8 +19,6 @@ pub mod v2 {
// type EventHandlers<'a> = HashMap<service name, sender object> // type EventHandlers<'a> = HashMap<service name, sender object>
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>; type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
//
type FileTriggersWrapper<'a> = HashMap<&'a str, Triggers<'a>>;
struct FilesController<'a> { struct FilesController<'a> {
name: &'a str, name: &'a str,
@ -54,12 +52,12 @@ pub mod v2 {
} }
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) { async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
let _ = self.triggers.iter() let _ = self.triggers.iter()
.map(|(_, (triggers, channel))| async { .map(|(prc_name, (triggers, channel))| async {
let _ = channel.send({ let _ = channel.send({
match &trigger_type { match &trigger_type {
None => Events::Positive(self.name), None => Events::Positive(self.name),
Some(event) => { Some(event) => {
info!("Event on {} ({}) : {}", self.name, &self.path, event); info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
event.event_from_file_trigger_controller(self.name, triggers) event.event_from_file_trigger_controller(self.name, triggers)
}, },
} }
@ -94,7 +92,8 @@ pub mod v2 {
}, },
} }
} }
self.trigger_on(Some(FileTriggerType::OnChange)).await; self.trigger_on(Some(FileTriggerType::OnChange)).await;
return;
} }
} }
}, },

View File

@ -8,17 +8,20 @@ use tokio::time::{Duration, Instant};
use tokio::sync::mpsc::Sender as MpscSender; use tokio::sync::mpsc::Sender as MpscSender;
pub mod v2 { pub mod v2 {
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceWaitConfig}; use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceWaitConfig, ServiceState};
use super::*; use super::*;
use std::collections::HashMap; use std::collections::{HashMap, BTreeMap, VecDeque};
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>; // type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>; type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
// type wrapper for service wait queue
type ConnectionQueue<'a> = BTreeMap<u8, VecDeque<(&'a str, Triggers<'a>, MpscSender<Events<'a>>)>>;
struct ServicesController<'a> { struct ServicesController<'a> {
name : &'a str, name : &'a str,
access_url : String, access_url : String,
state: ServiceState,
config: ServiceWaitConfig, config: ServiceWaitConfig,
event_registrator : EventHandlers<'a>, event_registrator : EventHandlers<'a>,
} }
@ -27,6 +30,7 @@ pub mod v2 {
ServicesController { ServicesController {
name : "", name : "",
access_url : String::new(), access_url : String::new(),
state : ServiceState::Unavailable,
config: ServiceWaitConfig::default(), config: ServiceWaitConfig::default(),
event_registrator : EventHandlers::new(), event_registrator : EventHandlers::new(),
} }
@ -37,10 +41,32 @@ pub mod v2 {
self.event_registrator = event_registrator; self.event_registrator = event_registrator;
Ok(()) Ok(())
} }
async fn check_state(&mut self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?;
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
}
Ok(())
}
async fn trigger_on(&mut self) {}
} }
impl<'a> ProcessUnit<'a> for ServicesController<'a> { impl<'a> ProcessUnit<'a> for ServicesController<'a> {
async fn process(&mut self) { async fn process(&mut self) {
// check_service(hostname, port)
let current_state = self.check_state().await;
match (&self.state, current_state) {
(ServiceState::Unavailable, Ok(_)) => {
warn!("Unreachable for connection service `{}`. Notifying {} process(es)", &self.access_url, self.event_registrator.len());
//
self.state = ServiceState::Unavailable;
},
(ServiceState::Ok, Err(_)) => {
warn!("Connection with `{}` service was established. Notifying {} process(es)", &self.access_url, self.event_registrator.len());
//
self.state = ServiceState::Unavailable;
},
_ => { /* DEAD END WITH NO INTEREST */ },
}
} }
} }
} }