file change handling unit is added now 0_0

pull/9/head
prplV 2024-07-12 16:03:24 +03:00
parent cf65f2c2ea
commit b62eb8caf6
4 changed files with 90 additions and 178 deletions

131
Cargo.lock generated
View File

@ -69,40 +69,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]] [[package]]
name = "crossbeam-channel" name = "futures-core"
version = "0.5.13" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]]
name = "filetime"
version = "0.2.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.4.1",
"windows-sys 0.52.0",
]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "gimli" name = "gimli"
@ -118,13 +88,15 @@ checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]] [[package]]
name = "inotify" name = "inotify"
version = "0.9.6" version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" checksum = "fdd168d97690d0b8c412d6b6c10360277f4d7ee495c5d0d5d5fe0854923255cc"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
"futures-core",
"inotify-sys", "inotify-sys",
"libc", "libc",
"tokio",
] ]
[[package]] [[package]]
@ -142,26 +114,6 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "kqueue"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.155" version = "0.2.155"
@ -178,12 +130,6 @@ dependencies = [
"scopeguard", "scopeguard",
] ]
[[package]]
name = "log"
version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.4" version = "2.7.4"
@ -206,30 +152,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [ dependencies = [
"libc", "libc",
"log",
"wasi", "wasi",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "notify"
version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [
"bitflags 2.6.0",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"log",
"mio",
"walkdir",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "num_cpus" name = "num_cpus"
version = "1.16.0" version = "1.16.0"
@ -267,7 +193,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"redox_syscall 0.5.2", "redox_syscall",
"smallvec", "smallvec",
"windows-targets 0.52.5", "windows-targets 0.52.5",
] ]
@ -296,15 +222,6 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "redox_syscall"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [
"bitflags 1.3.2",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.5.2" version = "0.5.2"
@ -316,9 +233,9 @@ dependencies = [
[[package]] [[package]]
name = "runner-rs" name = "runner-rs"
version = "0.1.10" version = "0.3.0"
dependencies = [ dependencies = [
"notify", "inotify",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
@ -336,15 +253,6 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"
@ -454,31 +362,12 @@ version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.11.0+wasi-snapshot-preview1" version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi-util"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b"
dependencies = [
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"

View File

@ -1,10 +1,10 @@
[package] [package]
name = "runner-rs" name = "runner-rs"
version = "0.1.10" version = "0.3.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
notify = "6.1.1" inotify = "0.10.2"
serde = { version = "1.0.203", features = ["derive"] } serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.118" serde_json = "1.0.118"
tokio = { version = "1.38.0", features = ["full", "time"] } tokio = { version = "1.38.0", features = ["full", "time"] }

View File

@ -9,7 +9,7 @@
"src" : "/home/vladislav/web/", "src" : "/home/vladislav/web/",
"triggers" : { "triggers" : {
"onDelete" : "stop", "onDelete" : "stop",
"onChange" : "hold" "onChange" : "restart"
} }
}, },
{ {
@ -17,7 +17,7 @@
"src" : "/home/vladislav/web/", "src" : "/home/vladislav/web/",
"triggers" : { "triggers" : {
"onDelete" : "stop", "onDelete" : "stop",
"onChange" : "hold" "onChange" : "stop"
} }
} }
], ],
@ -42,7 +42,7 @@
"src" : "/home/vladislav/web/", "src" : "/home/vladislav/web/",
"triggers" : { "triggers" : {
"onDelete" : "hold", "onDelete" : "hold",
"onChange" : "hold" "onChange" : "restart"
} }
}], }],
"services" : [{ "services" : [{

View File

@ -3,13 +3,13 @@ use serde_json;
use tokio::join; use tokio::join;
use core::panic; use core::panic;
use std::fmt::Debug; use std::fmt::Debug;
use std::fs; use std::{clone, fs};
use std::path::Path; use std::path::Path;
use std::process::{ Command, Output }; use std::process::{ Command, Output };
use std::sync::Arc; use std::sync::{Arc, Mutex};
use tokio::time::{ Duration, Instant }; use tokio::time::{ Duration, Instant };
use tokio::sync::mpsc; use tokio::sync::{mpsc, watch};
use notify::{Watcher, RecursiveMode, Result as res, Event, RecommendedWatcher, Error as notify_error}; use inotify::{ Inotify, WatchMask };
/// # an Error enum (nextly will be deleted and replaced) /// # an Error enum (nextly will be deleted and replaced)
enum CustomError { enum CustomError {
@ -106,7 +106,7 @@ async fn main() {
let proc = Arc::new(proc.clone()); let proc = Arc::new(proc.clone());
let tx = Arc::new(tx.clone()); let tx = Arc::new(tx.clone());
let event = tokio::spawn(async move { let event = tokio::spawn(async move {
run_daemons(proc, tx, &mut rx).await; run_daemons(proc.clone(), tx.clone(), &mut rx).await;
}); });
handler.push(event); handler.push(event);
} }
@ -117,6 +117,20 @@ async fn main() {
return; return;
} }
async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {
let src = format!("{}{}", path, filename);
let mut inotify: Inotify = Inotify::init().expect(&("Error: Cannot create watcher for ".to_owned() + &src));
_ = inotify
.watches()
.add(
&src,
WatchMask::MODIFY
);
Ok(inotify)
}
/// # async func to run 3 main daemons (now its more like tree-form than classiacl 0.1.0 form ) /// # async func to run 3 main daemons (now its more like tree-form than classiacl 0.1.0 form )
/// > hint : give mpsc with capacity 1 to jump over potential errors during running process /// > hint : give mpsc with capacity 1 to jump over potential errors during running process
/// ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") ** /// ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") **
@ -126,10 +140,15 @@ async fn run_daemons(
rx: &mut mpsc::Receiver<u8> rx: &mut mpsc::Receiver<u8>
) )
{ {
// clone name // creating watchers + ---buffers---
// clone path let mut watchers: Vec<Inotify> = vec![];
for file in proc.dependencies.files.clone().into_iter() {
watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
}
let watchers_clone: Arc<Mutex<Vec<Inotify>>> = Arc::new(Mutex::new(watchers));
loop { loop {
let run_hand = running_handler(proc.clone(), tx.clone()); let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
tokio::select! { tokio::select! {
_ = run_hand => {}, _ = run_hand => {},
_val = rx.recv() => { _val = rx.recv() => {
@ -139,6 +158,7 @@ async fn run_daemons(
if is_active(&proc.name).await { if is_active(&proc.name).await {
println!("Dependency handling error: Terminating {} process ..." , &proc.name); println!("Dependency handling error: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await; terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
} }
// break; // break;
}, },
@ -147,6 +167,7 @@ async fn run_daemons(
if !is_frozen(&proc.name).await { if !is_frozen(&proc.name).await {
println!("Dependency handling error: Freezing {} process ..." , &proc.name); println!("Dependency handling error: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await; freeze_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
} }
}, },
// 3 - Running process error // 3 - Running process error
@ -162,6 +183,7 @@ async fn run_daemons(
if is_active(&proc.name).await { if is_active(&proc.name).await {
println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await; terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
} }
// break; // break;
}, },
@ -170,6 +192,7 @@ async fn run_daemons(
if !is_frozen(&proc.name).await { if !is_frozen(&proc.name).await {
println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await; freeze_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
} }
}, },
// // 7 - File-dependency change -> terminating (after check) // // 7 - File-dependency change -> terminating (after check)
@ -291,10 +314,16 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
} }
} }
// check process status daemon // check process status daemon
async fn running_handler(prc: Arc<TrackingProcess>, tx: Arc<mpsc::Sender<u8>>){ async fn running_handler
(
prc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<Mutex<Vec<Inotify>>>
)
{
// println!("running daemon on {}", prc.name); // println!("running daemon on {}", prc.name);
// services and files check (once) // services and files check (once)
let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone()); let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone());
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
let res = join!(files_check, services_check); let res = join!(files_check, services_check);
@ -314,9 +343,16 @@ async fn running_handler(prc: Arc<TrackingProcess>, tx: Arc<mpsc::Sender<u8>>){
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
async fn file_handler(name: &str, files: &Vec<Files>, tx: Arc<mpsc::Sender<u8>>) -> Result<(), CustomError>{ async fn file_handler
(
name: &str,
files: &Vec<Files>,
tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<Mutex<Vec<Inotify>>>
) -> Result<(), CustomError>
{
// println!("file daemon on {}", name); // println!("file daemon on {}", name);
for file in files { for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename); // let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() { if check_file(&file.filename, &file.src).await.is_err() {
if !is_active(name).await || is_frozen(name).await { if !is_active(name).await || is_frozen(name).await {
@ -344,48 +380,35 @@ async fn file_handler(name: &str, files: &Vec<Files>, tx: Arc<mpsc::Sender<u8>>)
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
}, },
} }
} else if is_active(name).await && !is_frozen(name).await{
let watchers = watchers.clone();
let mut buffer = [0; 128];
let mut mutex_guard = watchers.lock().unwrap();
if let Some(notify) = mutex_guard.get_mut(i) {
let events = notify.read_events(&mut buffer);
match events {
Ok(_) => {
match file.triggers.on_change.as_str() {
"stop" => {
// tx.send(7).await.unwrap();
},
"restart" => {},
"hold" => {},
_ => {},
}
},
Err(_) => {
},
}
}
// println!("after if let ");
} }
} }
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::task::yield_now().await; tokio::task::yield_now().await;
Ok(()) Ok(())
} }
// async fn async_watch<P: AsRef<Path>>(path: P, tx_main: Arc<mpsc::Sender<u8>>) -> Result<(), CustomError> {
// println!("{:?}", path.as_ref());
// let (tx, mut rx) = mpsc::channel(1);
// let mut watcher = RecommendedWatcher::new(
// move |res| {
// let tx_clone = tx.clone();
// println!("in move");
// tokio::task::spawn_blocking(move || {
// println!("in spawn blocking");
// tokio::spawn(async move {
// println!("in spawn");
// tx_clone.send(res).await.unwrap();
// });
// });
// },
// notify::Config::default(),
// ).unwrap();
// watcher.watch(path.as_ref(), RecursiveMode::NonRecursive).unwrap();
// println!("after watch");
// while let Some(res) = rx.recv().await {
// println!("in while iteration");
// match res {
// Ok(event) => {
// println!("changed: {:?}", event);
// // tx_main.send().await.unwrap();
// },
// Err(_) => {
// println!("in err");
// },
// }
// }
// println!("after all");
// Ok(())
// }
async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string()); let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string()); let arc_path = Arc::new(path.to_string());