UPD: file changing handler

pull/9/head
prplV 2024-07-17 11:27:27 +03:00
parent b60e1c5b2f
commit 8bcff113c9
4 changed files with 44 additions and 32 deletions

2
Cargo.lock generated
View File

@ -233,7 +233,7 @@ dependencies = [
[[package]] [[package]]
name = "runner-rs" name = "runner-rs"
version = "0.3.0" version = "0.4.0"
dependencies = [ dependencies = [
"inotify", "inotify",
"serde", "serde",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "runner-rs" name = "runner-rs"
version = "0.3.0" version = "0.4.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -17,7 +17,7 @@
"src" : "/home/vladislav/web/", "src" : "/home/vladislav/web/",
"triggers" : { "triggers" : {
"onDelete" : "stop", "onDelete" : "stop",
"onChange" : "stop" "onChange" : "hold"
} }
} }
], ],

View File

@ -100,7 +100,7 @@ async fn main() {
proc.dependencies.services.len()); proc.dependencies.services.len());
// creating msg channel // creating msg channel
// can or should be executed in new thread (with Arc -> Rc) // can or should be executed in new thread
let (tx, mut rx) = mpsc::channel::<u8>(1); let (tx, mut rx) = mpsc::channel::<u8>(1);
let proc = Arc::new(proc.clone()); let proc = Arc::new(proc.clone());
@ -113,7 +113,6 @@ async fn main() {
for i in handler { for i in handler {
i.await.unwrap(); i.await.unwrap();
} }
return; return;
} }
@ -127,7 +126,6 @@ async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::
&src, &src,
WatchMask::MODIFY WatchMask::MODIFY
); );
Ok(inotify) Ok(inotify)
} }
@ -145,7 +143,7 @@ async fn run_daemons(
for file in proc.dependencies.files.clone().into_iter() { for file in proc.dependencies.files.clone().into_iter() {
watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
} }
let watchers_clone: Arc<Mutex<Vec<Inotify>>> = Arc::new(Mutex::new(watchers)); let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> = Arc::new(tokio::sync::Mutex::new(watchers));
loop { loop {
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
@ -176,7 +174,7 @@ async fn run_daemons(
}, },
// 4 - Timeout of waiting service-dependency -> staying (after waiting) // 4 - Timeout of waiting service-dependency -> staying (after waiting)
4 => { 4 => {
println!("Timeout of waiting service-dependency: Ignoring for {} process ..." , &proc.name); println!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name);
}, },
// 5 - Timeout of waiting service-dependency -> terminating (after waiting) // 5 - Timeout of waiting service-dependency -> terminating (after waiting)
5 => { 5 => {
@ -196,11 +194,21 @@ async fn run_daemons(
} }
}, },
// // 7 - File-dependency change -> terminating (after check) // // 7 - File-dependency change -> terminating (after check)
// 7 => {}, 7 => {
println!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
},
// // 8 - File-dependency change -> restarting (after check) // // 8 - File-dependency change -> restarting (after check)
// 8 => {}, 8 => {
println!("File-dependency warning (file changed). Restarting {} process...", &proc.name);
let _ = restart_process(&proc.name, &proc.path).await;
tokio::time::sleep(Duration::from_millis(100)).await;
},
// // 9 - File-dependency change -> staying (after check) // // 9 - File-dependency change -> staying (after check)
// 9 => {}, 9 => {
println!("File-dependency warning (file changed). Ignoring on {} process...", &proc.name);
},
// 10 - Process unfreaze call via file handler // 10 - Process unfreaze call via file handler
10 => { 10 => {
@ -219,6 +227,9 @@ async fn run_daemons(
// 101 - Impermissible trigger values in JSON // 101 - Impermissible trigger values in JSON
101 => { 101 => {
println!("Impermissible trigger values in JSON"); println!("Impermissible trigger values in JSON");
if is_active(&proc.name).await {
terminate_process(&proc.name).await;
}
break; break;
}, },
_ => {}, _ => {},
@ -284,19 +295,21 @@ async fn terminate_process (name: &str) {
.expect("Failed to execute command 'pkill'"); .expect("Failed to execute command 'pkill'");
} }
async fn freeze_process(name: &str) { async fn freeze_process(name: &str) {
// let pid = get_pid(name);
let _ = Command::new("pkill") let _ = Command::new("pkill")
.args(["-STOP", name]) .args(["-STOP", name])
.output() .output()
.expect("Failed to freeze process"); .expect("Failed to freeze process");
} }
async fn unfreeze_process(name: &str) { async fn unfreeze_process(name: &str) {
// let pid = get_pid(name);
let _ = Command::new("pkill") let _ = Command::new("pkill")
.args(["-CONT", name]) .args(["-CONT", name])
.output() .output()
.expect("Failed to unfreeze process"); .expect("Failed to unfreeze process");
} }
async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> {
terminate_process(name).await;
return start_process(name, path).await;
}
async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
let runsh = format!("{}{}", path, "/run.sh"); let runsh = format!("{}{}", path, "/run.sh");
@ -318,7 +331,7 @@ async fn running_handler
( (
prc: Arc<TrackingProcess>, prc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>, tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<Mutex<Vec<Inotify>>> watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>
) )
{ {
// println!("running daemon on {}", prc.name); // println!("running daemon on {}", prc.name);
@ -348,7 +361,7 @@ async fn file_handler
name: &str, name: &str,
files: &Vec<Files>, files: &Vec<Files>,
tx: Arc<mpsc::Sender<u8>>, tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<Mutex<Vec<Inotify>>> watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>
) -> Result<(), CustomError> ) -> Result<(), CustomError>
{ {
// println!("file daemon on {}", name); // println!("file daemon on {}", name);
@ -383,27 +396,26 @@ async fn file_handler
} else if is_active(name).await && !is_frozen(name).await{ } else if is_active(name).await && !is_frozen(name).await{
let watchers = watchers.clone(); let watchers = watchers.clone();
let mut buffer = [0; 128]; let mut buffer = [0; 128];
let mut mutex_guard = watchers.lock().unwrap(); let mut mutex_guard = watchers.lock().await;
if let Some(notify) = mutex_guard.get_mut(i) { if let Some(notify) = mutex_guard.get_mut(i) {
let events = notify.read_events(&mut buffer); let events = notify.read_events(&mut buffer);
match events { if events.is_ok() {
Ok(_) => { match file.triggers.on_change.as_str() {
println!("file {} changed!", &file.filename) "stop" => {
// match file.triggers.on_change.as_str() { let _ = tx.send(7).await;
// "stop" => {
// tx.send(7).await.unwrap();
// },
// "restart" => {},
// "hold" => {},
// _ => {},
// }
}, },
Err(_) => { "restart" => {
let _ = tx.send(8).await;
},
"stay" => {
let _ = tx.send(9).await;
},
_ => {
let _ = tx.send(101).await;
}, },
} }
} }
// println!("after if let "); }
} }
} }
tokio::task::yield_now().await; tokio::task::yield_now().await;