debug -> opened issue #40

feature/configv2
prplV 2025-05-04 11:01:07 -04:00
parent 403285a937
commit 3dd238cf97
4 changed files with 21 additions and 16 deletions

View File

@ -8,7 +8,7 @@ anyhow = "1.0.93"
chrono = "0.4.38"
clap = { version = "4.5.21", features = ["derive"] }
env_logger = "0.11.3"
inotify = "0.10.2"
inotify = "0.11.0"
log = "0.4.22"
pcap = "2.2.0"
redis = "0.29.2"

View File

@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use std::sync::Arc;
#[derive(Debug)]
pub enum DependencyType {
File,
Service,
@ -93,10 +94,12 @@ pub enum ProcessState {
Stopped,
StoppedByCli,
}
#[derive(Debug)]
pub enum Events {
Positive(Arc<str>),
Negative(NegativeOutcomes)
}
#[derive(Debug)]
pub enum NegativeOutcomes {
FileWasChanged(Arc<str>, DependencyType, Arc<str>),
FileWasMovedOrDeleted(Arc<str>, DependencyType, Arc<str>),

View File

@ -21,7 +21,6 @@ use std::sync::Arc;
// use tokio::join;
use tokio::sync::mpsc;
use tokio::time::Duration;
use tokio::sync::broadcast::Receiver;
// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender};
// controllers import
use prcs::v2::ProcessesController;
@ -36,6 +35,7 @@ pub mod v2 {
use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers};
use super::*;
#[derive(Debug)]
enum ControllerResult {
Process(Option<ProcessesController>),
File(Option<FilesController>),
@ -127,6 +127,7 @@ pub mod v2 {
async fn process(&mut self) {
info!("Initializing monitoring ...");
loop {
// dbg!(&self);
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
// let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap());
// let res = tokio::join!(prc.process(), file.process(), serv.process());

View File

@ -65,20 +65,18 @@
}
}
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
let _ = self.triggers.iter()
.map(|(prc_name, (triggers, channel))| async {
let _ = channel.send({
match &trigger_type {
for (prc_name, (triggers, channel)) in &self.triggers {
let msg = match &trigger_type {
None => {
Events::Positive(self.code_name.clone())
},
Some(event) => {
info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
event.event_from_file_trigger_controller(self.code_name.clone(), triggers)
info!("Event on file {} ({}) : {}. Notifying `{}` ...", &self.name, &self.path, event, &prc_name);
event.event_from_file_trigger_controller(self.code_name.clone(), &triggers)
},
};
let _ = channel.send(msg).await;
}
}).await;
});
}
}
#[async_trait]
@ -86,11 +84,14 @@
async fn process(&mut self) {
// polling file check
// 1) existing check
// dbg!(&self);
if let Ok(_) = check_file(&self.name, &self.path).await {
match &mut self.watcher {
Some(notify) => {
let mut buffer = [0; 1024];
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
// notif_events.into_iter().for_each(|mask| {dbg!(&mask.mask);});
// todo!();
if let (recreate_watcher, true) = (
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
notif_events.any(|mask| mask.mask == EventMask::MODIFY)