big change but it's still not working in utils.rs

feature/configv2
prplV 2025-04-25 10:56:42 -04:00
parent 541b0f52dd
commit 6d56d1e39c
9 changed files with 308 additions and 218 deletions

View File

@ -18,3 +18,5 @@ sysinfo = "0.32.0"
tokio = { version = "1.38.0", features = ["full", "time"] } tokio = { version = "1.38.0", features = ["full", "time"] }
noxis-cli = { path = "../noxis-cli" } noxis-cli = { path = "../noxis-cli" }
dotenv = "0.15.0" dotenv = "0.15.0"
futures = "0.3.31"
async-trait = "0.1.88"

View File

@ -22,7 +22,6 @@
"port": 443, "port": 443,
"triggers": { "triggers": {
"wait": 10, "wait": 10,
"delay": 2,
"onLost": "restart" "onLost": "restart"
} }
} }

View File

@ -58,7 +58,19 @@ async fn main() -> anyhow::Result<()>{
handler.push(ctrlc); handler.push(ctrlc);
let monitoring = tokio::spawn(async move { let monitoring = tokio::spawn(async move {
if let Err(er) = init_monitoring(&mut rx_brd).await { let config = if !rx_brd.is_empty() {
rx_brd.recv().await?
} else {
let mut tick = tokio::time::interval(Duration::from_millis(500));
loop {
tick.tick().await;
break match rx_brd.try_recv() {
Ok(conf) => conf,
Err(_) => continue,
}
}
};
if let Err(er) = init_monitoring(config).await {
error!("Monitoring mod failed due to {}", er); error!("Monitoring mod failed due to {}", er);
} }
}); });

View File

@ -276,7 +276,7 @@ pub mod v2 {
// 100% local exists here // 100% local exists here
// create watcher on local config file // create watcher on local config file
match create_watcher("", local_config_path).await { match create_watcher("", local_config_path) {
Ok(mut watcher) => { Ok(mut watcher) => {
loop { loop {
let mut need_to_export_config = false; let mut need_to_export_config = false;
@ -340,7 +340,7 @@ pub mod v2 {
// recreation watcher (draining activity buffer mechanism) // recreation watcher (draining activity buffer mechanism)
// if local config file was deleted and recreated // if local config file was deleted and recreated
// if local config file was modified locally // if local config file was modified locally
match create_watcher("", local_config_path).await { match create_watcher("", local_config_path) {
Ok(new) => watcher = new, Ok(new) => watcher = new,
Err(er) => error!("Cannot create new watcher due to {}", er), Err(er) => error!("Cannot create new watcher due to {}", er),
} }

View File

@ -2,6 +2,7 @@
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use async_trait::async_trait;
pub enum DependencyType { pub enum DependencyType {
File, File,
@ -71,6 +72,7 @@ impl<'a> Triggers<'a> {
} }
} }
#[derive(Debug)]
pub struct FileTriggersForController<'a> { pub on_change: &'a str, pub on_delete: &'a str } pub struct FileTriggersForController<'a> { pub on_change: &'a str, pub on_delete: &'a str }
pub struct ServiceTriggersForController<'a>(&'a str); pub struct ServiceTriggersForController<'a>(&'a str);
@ -83,6 +85,7 @@ impl std::fmt::Display for DependencyType {
} }
} }
#[derive(Debug)]
pub enum ProcessState { pub enum ProcessState {
Pending, Pending,
Holding, Holding,
@ -99,10 +102,10 @@ pub enum NegativeOutcomes<'a> {
ServiceIsUnreachable(&'a str, DependencyType, &'a str), ServiceIsUnreachable(&'a str, DependencyType, &'a str),
} }
#[async_trait]
pub trait ProcessUnit<'a> { pub trait ProcessUnit<'a> {
fn process(&'a mut self) -> impl std::future::Future<Output = ()> + Send; async fn process(&'a mut self);
} }
/// # an Error enum (next will be deleted and replaced) /// # an Error enum (next will be deleted and replaced)
pub enum CustomError { pub enum CustomError {
Fatal, Fatal,
@ -251,7 +254,7 @@ pub struct Files {
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Services { pub struct Services {
pub hostname: String, pub hostname: String,
pub port: u32, pub port: Option<u32>,
pub triggers: ServiceTriggers, pub triggers: ServiceTriggers,
} }
@ -275,7 +278,6 @@ pub struct Services {
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ServiceTriggers { pub struct ServiceTriggers {
pub wait: u32, pub wait: u32,
pub delay: u32,
#[serde(rename = "onLost")] #[serde(rename = "onLost")]
pub on_lost: String, pub on_lost: String,
} }

View File

@ -7,35 +7,42 @@ pub mod services;
// TODO : saving current flags state // TODO : saving current flags state
use crate::options::structs::{CustomError, TrackingProcess, Processes}; use crate::options::structs::{CustomError, TrackingProcess, Processes};
use files::create_watcher; // use files::create_watcher;
use files::file_handler; // use files::file_handler;
use inotify::Inotify; // use inotify::Inotify;
use log::{error, warn, info}; use log::{error, warn, info};
use prcs::{ use prcs::{
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process, freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
unfreeze_process, unfreeze_process,
}; };
use services::service_handler; // use services::service_handler;
use std::process::Command; use std::process::Command;
use std::sync::Arc; use std::sync::Arc;
use tokio::join; // use tokio::join;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::Duration; use tokio::time::Duration;
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender}; // use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender};
// controllers import // controllers import
use prcs::v2::ProcessesController; use prcs::v2::ProcessesController;
use files::v2::FilesController; use files::v2::FilesController;
use services::v2::ServicesController; use services::v2::ServicesController;
use async_trait::async_trait;
const GET_ID_CMD: &str = "hostname"; const GET_ID_CMD: &str = "hostname";
pub mod v2 { pub mod v2 {
use std::collections::{HashMap, LinkedList}; use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit}; use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
use super::*; use super::*;
enum ControllerResult<'a> {
Process(Option<ProcessesController<'a>>),
File(Option<FilesController<'a>>),
Service(Option<ServicesController<'a>>),
}
#[derive(Debug)]
struct Supervisor<'a> { struct Supervisor<'a> {
prcs : LinkedList<ProcessesController<'a>>, prcs : LinkedList<ProcessesController<'a>>,
files : LinkedList<FilesController<'a>>, files : LinkedList<FilesController<'a>>,
@ -43,35 +50,92 @@ pub mod v2 {
} }
impl<'a> Supervisor<'a> { impl<'a> Supervisor<'a> {
pub fn new(config: &'a Processes) -> Supervisor<'a> { pub fn new() -> Supervisor<'a> {
let mut p = LinkedList::new(); Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()}
let mut f = LinkedList::new(); }
let mut s = LinkedList::new(); pub async fn with_config(mut self, config: &'a Processes) -> Supervisor<'a> {
let _ = config.processes.iter() let _ = config.processes.iter()
.map(|prc| { .for_each(|prc| {
let (rx, tx) = mpsc::channel::<Events<'a>>(10); let (rx, tx) = mpsc::channel::<Events<'a>>(10);
let temp = ProcessesController::new(&prc.name, tx); let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
if !p.contains(&temp) { if !self.prcs.contains(&temp) {
p.push_back(temp); self.prcs.push_back(temp);
} }
let rx = Arc::new(rx); let rx = Arc::new(rx);
// files
let _ = prc.dependencies.files.iter() let _ = prc.dependencies.files.iter()
.map(|file| async { .for_each(|file| {
let mut hm = HashMap::new(); let mut hm = HashMap::new();
let triggers = FileTriggersForController { on_change: &file.triggers.on_change, on_delete: &file.triggers.on_delete}; let triggers = FileTriggersForController { on_change: &file.triggers.on_change, on_delete: &file.triggers.on_delete};
hm.insert(&prc.name, (triggers, rx.clone())); hm.insert(prc.name.as_str(), (triggers, rx.clone()));
let tempfile = FilesController::new(&file.filename, hm).with_path(file.src).await;
let tempfile = FilesController::new(&file.filename.as_str(), hm)
.with_path(&file.src);
if let Ok(file) = tempfile {
if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) {
current_file.add_event(file);
} else {
self.files.push_back(file);
}
}
}); });
// servs // servs
let _ = prc.dependencies.services.iter() let _ = prc.dependencies.services.iter()
.map(|serv| { .for_each(|serv| {
let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref());
// preparations
let rx = rx.clone();
let serv_cont = ServicesController::new().with_access_name(
&serv.hostname,
access_url
);
// triggers
let triggers = Triggers::new_service(&serv.triggers.on_lost, serv.triggers.wait);
if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) {
proc.add_process(&prc.name, triggers, rx);
} else {
// vecdeque for queue
let mut vec: VecDeque<&'a str> = VecDeque::new();
vec.push_back(&prc.name);
// connection_queue
let mut connection_queue: BTreeMap<u32, VecDeque<&'a str>> = BTreeMap::new();
connection_queue.insert(serv.triggers.wait, vec);
// event_reg
let mut hm = HashMap::new();
hm.insert(prc.name.as_str(), (triggers, rx));
let serv_cont = serv_cont.with_params(connection_queue, hm);
self.services.push_back(serv_cont);
}
}); });
}); });
self
}
pub fn get_stats(&self) -> String {
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len())
}
async fn proccess_prc<T>(&mut self) {
Supervisor { prcs: p, files: f, services: s } }
}
#[async_trait]
impl<'a> ProcessUnit<'a> for Supervisor<'a> {
async fn process(&'a mut self) {
info!("Initializing monitoring ...");
loop {
// let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
// let res = tokio::join!(prc.process(), file.process(), serv.process());
if let Some(mut val) = self.prcs.pop_front() {
tokio::spawn(async move {val.process().await;}).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
} }
} }
@ -81,23 +145,11 @@ pub mod v2 {
// spawn services // spawn services
// ## for ... i.await in loop // ## for ... i.await in loop
pub async fn init_monitoring( pub async fn init_monitoring(
local_config: &mut Receiver<Processes>, config: Processes
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let config = if !local_config.is_empty() { let mut supervisor = Supervisor::new().with_config(&config).await;
local_config.recv().await? info!("Monitoring: {} ", &supervisor.get_stats());
} else { supervisor.process().await;
let mut tick = tokio::time::interval(Duration::from_millis(500));
loop {
tick.tick().await;
break match local_config.try_recv() {
Ok(conf) => conf,
Err(_) => continue,
}
}
};
info!("Processing {} processes ...", config.processes.len());
// LinkedList <ProcessController>
// LinkedList <FileController>
Ok(()) Ok(())
} }
@ -140,37 +192,37 @@ pub mod v2 {
/// ///
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process /// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
/// ///
pub async fn run_daemons( // pub async fn run_daemons(
proc: Arc<TrackingProcess>, // proc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>, // tx: Arc<mpsc::Sender<u8>>,
rx: &mut mpsc::Receiver<u8>, // rx: &mut mpsc::Receiver<u8>,
) { // ) {
// creating watchers + ---buffers--- // // creating watchers + ---buffers---
let mut watchers: Vec<Inotify> = vec![]; // let mut watchers: Vec<Inotify> = vec![];
for file in proc.dependencies.files.clone().into_iter() { // for file in proc.dependencies.files.clone().into_iter() {
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { // if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
watchers.push(watcher); // watchers.push(watcher);
} else { // } else {
let _ = tx.send(121).await; // let _ = tx.send(121).await;
} // }
// watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); // // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
} // }
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> = // let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
Arc::new(tokio::sync::Mutex::new(watchers)); // Arc::new(tokio::sync::Mutex::new(watchers));
loop { // loop {
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); // let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
tokio::select! { // tokio::select! {
_ = run_hand => continue, // _ = run_hand => continue,
_val = rx.recv() => { // _val = rx.recv() => {
if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { // if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
return; // return;
} // }
}, // },
} // }
tokio::task::yield_now().await; // tokio::task::yield_now().await;
} // }
} // }
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
match val { match val {
@ -300,36 +352,36 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
/// ///
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` /// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
/// ///
pub async fn running_handler( // pub async fn running_handler(
prc: Arc<TrackingProcess>, // prc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>, // tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>, // watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) { // ) {
// services and files check (once) // // services and files check (once)
let files_check = file_handler( // let files_check = file_handler(
&prc.name, // &prc.name,
&prc.dependencies.files, // &prc.dependencies.files,
tx.clone(), // tx.clone(),
watchers.clone(), // watchers.clone(),
); // );
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); // let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
let res = join!(files_check, services_check); // let res = join!(files_check, services_check);
// if inactive -> spawn checks -> active is true // // if inactive -> spawn checks -> active is true
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { // if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
if start_process(&prc.name, &prc.path).await.is_err() { // if start_process(&prc.name, &prc.path).await.is_err() {
tx.send(3).await.unwrap(); // tx.send(3).await.unwrap();
return; // return;
} // }
} // }
// if frozen -> spawn checks -> unfreeze is true // // if frozen -> spawn checks -> unfreeze is true
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { // else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
tx.send(10).await.unwrap(); // tx.send(10).await.unwrap();
return; // return;
} // }
// tokio::time::sleep(Duration::from_millis(100)).await; // // tokio::time::sleep(Duration::from_millis(100)).await;
tokio::task::yield_now().await; // tokio::task::yield_now().await;
} // }
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
/// # Fn `get_container_id` /// # Fn `get_container_id`

View File

@ -8,6 +8,7 @@
use tokio::sync::mpsc::Sender as Sender; use tokio::sync::mpsc::Sender as Sender;
use tokio::time::Duration; use tokio::time::Duration;
use crate::options::structs::Events; use crate::options::structs::Events;
use async_trait::async_trait;
pub mod v2 { pub mod v2 {
use log::{error, info, warn}; use log::{error, info, warn};
@ -21,6 +22,7 @@
// type EventHandlers<'a> = HashMap<service name, sender object> // type EventHandlers<'a> = HashMap<service name, sender object>
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>; type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
#[derive(Debug)]
pub struct FilesController<'a> { pub struct FilesController<'a> {
name : &'a str, name : &'a str,
path : String, path : String,
@ -46,10 +48,10 @@
code_name : name.to_string(), code_name : name.to_string(),
} }
} }
pub async fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController<'a>> { pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController<'a>> {
self.path = path.as_ref().to_string_lossy().into_owned(); self.path = path.as_ref().to_string_lossy().into_owned();
self.watcher = { self.watcher = {
match create_watcher(self.name, &self.path).await { match create_watcher(self.name, &self.path) {
Ok(val) => Some(val), Ok(val) => Some(val),
Err(er) => { Err(er) => {
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er); error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
@ -60,6 +62,11 @@
self.code_name = format!("{}{}", &self.path, &self.code_name); self.code_name = format!("{}{}", &self.path, &self.code_name);
Ok(self) Ok(self)
} }
pub fn add_event(&mut self, file_controller : FilesController<'a>) {
for (k, v) in file_controller.triggers {
self.triggers.entry(k).or_insert(v);
}
}
async fn trigger_on(&'a mut self, trigger_type: Option<FileTriggerType>) { async fn trigger_on(&'a mut self, trigger_type: Option<FileTriggerType>) {
let _ = self.triggers.iter() let _ = self.triggers.iter()
.map(|(prc_name, (triggers, channel))| async { .map(|(prc_name, (triggers, channel))| async {
@ -77,6 +84,7 @@
}); });
} }
} }
#[async_trait]
impl<'a> ProcessUnit<'a> for FilesController<'a> { impl<'a> ProcessUnit<'a> for FilesController<'a> {
async fn process(&'a mut self) { async fn process(&'a mut self) {
// polling file check // polling file check
@ -92,7 +100,7 @@
) { ) {
warn!("File {} ({}) was changed", self.name, &self.path); warn!("File {} ({}) was changed", self.name, &self.path);
if recreate_watcher { if recreate_watcher {
self.watcher = match create_watcher(self.name, &self.path).await { self.watcher = match create_watcher(self.name, &self.path) {
Ok(notifier) => Some(notifier), Ok(notifier) => Some(notifier),
Err(er) => { Err(er) => {
error!("Failed to recreate watcher for {} ({}) due to {}", error!("Failed to recreate watcher for {} ({}) due to {}",
@ -135,7 +143,7 @@
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> { pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
let src = format!("{}{}", path, filename); let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?; let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?; inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
@ -214,7 +222,7 @@
let mutex = notify.borrow_mut(); let mutex = notify.borrow_mut();
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); // *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { if let Ok(watcher) = create_watcher(&file.filename, &file.src) {
*mutex = watcher; *mutex = watcher;
} }
} }
@ -277,12 +285,12 @@
use super::*; use super::*;
#[tokio::test] #[tokio::test]
async fn try_to_create_watcher() { async fn try_to_create_watcher() {
let res = create_watcher("dep-file", "./tests/examples/").await; let res = create_watcher("dep-file", "./tests/examples/");
assert!(res.is_ok()); assert!(res.is_ok());
} }
#[tokio::test] #[tokio::test]
async fn try_to_create_invalid_watcher() { async fn try_to_create_invalid_watcher() {
let res = create_watcher("invalid-file", "/path/to/the/no/dir").await; let res = create_watcher("invalid-file", "/path/to/the/no/dir");
assert!(res.is_err()); assert!(res.is_err());
} }
#[tokio::test] #[tokio::test]

View File

@ -5,6 +5,7 @@ use tokio::time::Duration;
use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit}; use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit};
use std::collections::HashSet; use std::collections::HashSet;
use tokio::sync::mpsc::Receiver as MpscReciever; use tokio::sync::mpsc::Receiver as MpscReciever;
use async_trait::async_trait;
pub mod v2 { pub mod v2 {
use log::info; use log::info;
@ -13,6 +14,7 @@ pub mod v2 {
use super::*; use super::*;
#[derive(Debug)]
pub struct ProcessesController<'a> { pub struct ProcessesController<'a> {
name: &'a str, name: &'a str,
bin: String, bin: String,
@ -72,8 +74,9 @@ pub mod v2 {
} }
} }
#[async_trait]
impl<'a> ProcessUnit<'a> for ProcessesController<'a> { impl<'a> ProcessUnit<'a> for ProcessesController<'a> {
async fn process(&mut self) { async fn process(&'a mut self) {
if self.negative_events.len() == 0 { if self.negative_events.len() == 0 {
match self.state { match self.state {
ProcessState::Holding => { ProcessState::Holding => {

View File

@ -6,6 +6,7 @@ use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
use tokio::sync::mpsc::Sender as Sender; use tokio::sync::mpsc::Sender as Sender;
use async_trait::async_trait;
pub mod v2 { pub mod v2 {
use log::info; use log::info;
@ -52,19 +53,29 @@ pub mod v2 {
event_registrator : EventHandlers::new(), event_registrator : EventHandlers::new(),
} }
} }
pub fn with_params( pub fn with_access_name(
&mut self, mut self,
hostname: &'a str, hostname: &'a str,
port: Option<&'a str>, access_url: String,
) -> ServicesController<'a> {
self.name = hostname;
self.access_url = access_url;
self
}
pub fn with_params(
mut self,
conn_queue: ConnectionQueue<'a>, conn_queue: ConnectionQueue<'a>,
event_reg: EventHandlers<'a>, event_reg: EventHandlers<'a>,
) -> &mut ServicesController<'a> { ) -> ServicesController<'a> {
self.name = hostname;
self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)));
self.config = conn_queue; self.config = conn_queue;
self.event_registrator = event_reg; self.event_registrator = event_reg;
self self
} }
pub fn get_access_url(hostname: &'a str, port: Option<&u32>) -> String {
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
}
pub fn add_process( pub fn add_process(
&mut self, &mut self,
proc_name: &'a str, proc_name: &'a str,
@ -154,6 +165,7 @@ pub mod v2 {
} }
} }
} }
#[async_trait]
impl<'a> ProcessUnit<'a> for ServicesController<'a> { impl<'a> ProcessUnit<'a> for ServicesController<'a> {
async fn process(&'a mut self) { async fn process(&'a mut self) {
// check_service(hostname, port) // check_service(hostname, port)
@ -189,53 +201,53 @@ pub mod v2 {
/// ///
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` /// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
/// ///
pub async fn service_handler( // pub async fn service_handler(
name: &str, // name: &str,
services: &Vec<Services>, // services: &Vec<Services>,
tx: Arc<mpsc::Sender<u8>>, // tx: Arc<mpsc::Sender<u8>>,
) -> Result<(), CustomError> { // ) -> Result<(), CustomError> {
// println!("service daemon on {}", name); // // println!("service daemon on {}", name);
for serv in services { // for serv in services {
if check_service(&serv.hostname, &serv.port).await.is_err() { // if check_service(&serv.hostname, &serv.port).await.is_err() {
if !is_active(name).await || is_frozen(name).await { // if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal); // return Err(CustomError::Fatal);
} // }
error!( // error!(
"Service {}:{} is unreachable for process {}", // "Service {}:{} is unreachable for process {}",
&serv.hostname, &serv.port, &name // &serv.hostname, &serv.port, &name
); // );
match serv.triggers.on_lost.as_str() { // match serv.triggers.on_lost.as_str() {
"stay" => { // "stay" => {
tx.send(4).await.unwrap(); // tx.send(4).await.unwrap();
continue; // continue;
} // }
"stop" => { // "stop" => {
if looped_service_connecting(name, serv).await.is_err() { // if looped_service_connecting(name, serv).await.is_err() {
tx.send(5).await.unwrap(); // tx.send(5).await.unwrap();
tokio::task::yield_now().await; // tokio::task::yield_now().await;
return Err(CustomError::Fatal); // return Err(CustomError::Fatal);
} // }
} // }
"hold" => { // "hold" => {
// if is_frozen(name).await { // // if is_frozen(name).await {
// return Err(CustomError::Fatal); // // return Err(CustomError::Fatal);
// } // // }
if looped_service_connecting(name, serv).await.is_err() { // if looped_service_connecting(name, serv).await.is_err() {
tx.send(6).await.unwrap(); // tx.send(6).await.unwrap();
tokio::task::yield_now().await; // tokio::task::yield_now().await;
return Err(CustomError::Fatal); // return Err(CustomError::Fatal);
} // }
} // }
_ => { // _ => {
tx.send(101).await.unwrap(); // tx.send(101).await.unwrap();
return Err(CustomError::Fatal); // return Err(CustomError::Fatal);
} // }
} // }
} // }
} // }
tokio::time::sleep(Duration::from_millis(100)).await; // tokio::time::sleep(Duration::from_millis(100)).await;
Ok(()) // Ok(())
} // }
/// # Fn `looped_service_connecting` /// # Fn `looped_service_connecting`
/// ## for service's state check in loop (with delay and restriction of attempts) /// ## for service's state check in loop (with delay and restriction of attempts)
@ -250,54 +262,54 @@ pub async fn service_handler(
/// ///
/// *depends on* : fn `check_service` /// *depends on* : fn `check_service`
/// ///
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { // async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
if serv.triggers.wait == 0 { // if serv.triggers.wait == 0 {
loop { // loop {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; // tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!( // warn!(
"Attempting to connect from {} process to {}:{}", // "Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port // &name, &serv.hostname, &serv.port
); // );
match check_service(&serv.hostname, &serv.port).await { // match check_service(&serv.hostname, &serv.port).await {
Ok(_) => { // Ok(_) => {
log::info!( // log::info!(
"Successfully connected to {} from {} process!", // "Successfully connected to {} from {} process!",
&serv.hostname, // &serv.hostname,
&name // &name
); // );
break; // break;
} // }
Err(_) => { // Err(_) => {
tokio::task::yield_now().await; // tokio::task::yield_now().await;
} // }
} // }
} // }
Ok(()) // Ok(())
} else { // } else {
let start = Instant::now(); // let start = Instant::now();
while start.elapsed().as_secs() < serv.triggers.wait.into() { // while start.elapsed().as_secs() < serv.triggers.wait.into() {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; // tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!( // warn!(
"Attempting to connect from {} process to {}:{}", // "Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port // &name, &serv.hostname, &serv.port
); // );
match check_service(&serv.hostname, &serv.port).await { // match check_service(&serv.hostname, &serv.port).await {
Ok(_) => { // Ok(_) => {
log::info!( // log::info!(
"Successfully connected to {} from {} process!", // "Successfully connected to {} from {} process!",
&serv.hostname, // &serv.hostname,
&name // &name
); // );
return Ok(()); // return Ok(());
} // }
Err(_) => { // Err(_) => {
tokio::task::yield_now().await; // tokio::task::yield_now().await;
} // }
} // }
} // }
Err(CustomError::Fatal) // Err(CustomError::Fatal)
} // }
} // }
/// # Fn `check_service` /// # Fn `check_service`
/// ## for check current service's availiability /// ## for check current service's availiability