sync to async

pull/9/head
prplV 2024-07-11 13:32:52 +03:00
parent a2738e9fc7
commit 9acb1843e9
3 changed files with 72 additions and 44 deletions

2
Cargo.lock generated
View File

@ -199,7 +199,7 @@ dependencies = [
[[package]] [[package]]
name = "runner-rs" name = "runner-rs"
version = "0.1.0" version = "0.1.10"
dependencies = [ dependencies = [
"serde", "serde",
"serde_json", "serde_json",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "runner-rs" name = "runner-rs"
version = "0.1.0" version = "0.1.10"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -1,11 +1,13 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json; use serde_json;
use tokio::join; use tokio::join;
use core::panic;
use std::fmt::Debug; use std::fmt::Debug;
use std::fs; use std::fs;
use std::ops::RangeInclusive; use std::future::IntoFuture;
use std::path::Path; use std::path::Path;
use std::process::{Command, Output}; use std::process::{Command, Output};
use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -99,6 +101,7 @@ async fn main() {
proc.dependencies.services.len()); proc.dependencies.services.len());
// creating msg channel // creating msg channel
// can or should be executed in new thread (with Arc -> Rc)
let (tx, mut rx) = mpsc::channel::<u8>(1); let (tx, mut rx) = mpsc::channel::<u8>(1);
let proc = Arc::new(proc.clone()); let proc = Arc::new(proc.clone());
@ -124,16 +127,17 @@ async fn run_daemons(
rx: &mut mpsc::Receiver<u8> rx: &mut mpsc::Receiver<u8>
) )
{ {
// clone name
// clone path
loop { loop {
let run_hand = running_handler(&proc, tx.clone()); let run_hand = running_handler(proc.clone(), tx.clone());
tokio::select! { tokio::select! {
_ = run_hand => {}, _ = run_hand => {},
_val = rx.recv() => { _val = rx.recv() => {
match _val.unwrap() { match _val.unwrap() {
// 1 - File-dependency handling error -> terminating (after waiting) // 1 - File-dependency handling error -> terminating (after waiting)
1 => { 1 => {
if is_active(&proc.name) { if is_active(&proc.name).await {
println!("Dependency handling error: Terminating {} process ..." , &proc.name); println!("Dependency handling error: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await; terminate_process(&proc.name).await;
} }
@ -141,7 +145,7 @@ async fn run_daemons(
}, },
// 2 - File-dependency handling error -> holding (after waiting) // 2 - File-dependency handling error -> holding (after waiting)
2 => { 2 => {
if !is_frozen(&proc.name) { if !is_frozen(&proc.name).await {
println!("Dependency handling error: Freezing {} process ..." , &proc.name); println!("Dependency handling error: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await; freeze_process(&proc.name).await;
} }
@ -156,7 +160,7 @@ async fn run_daemons(
}, },
// 5 - Timeout of waiting service-dependency -> terminating (after waiting) // 5 - Timeout of waiting service-dependency -> terminating (after waiting)
5 => { 5 => {
if is_active(&proc.name) { if is_active(&proc.name).await {
println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await; terminate_process(&proc.name).await;
} }
@ -164,7 +168,7 @@ async fn run_daemons(
}, },
// 6 - Timeout of waiting service-dependency -> holding (after waiting) // 6 - Timeout of waiting service-dependency -> holding (after waiting)
6 => { 6 => {
if !is_frozen(&proc.name) { if !is_frozen(&proc.name).await {
println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await; freeze_process(&proc.name).await;
} }
@ -178,14 +182,14 @@ async fn run_daemons(
// 10 - Process unfreaze call via file handler // 10 - Process unfreaze call via file handler
10 => { 10 => {
if is_frozen(&proc.name) { if is_frozen(&proc.name).await {
println!("Unfreezing process {} call...", &proc.name); println!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await; unfreeze_process(&proc.name).await;
} }
}, },
// 11 - Process unfreaze call via service handler // 11 - Process unfreaze call via service handler
11 => { 11 => {
if is_frozen(&proc.name) { if is_frozen(&proc.name).await {
println!("Unfreezing process {} call...", &proc.name); println!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await; unfreeze_process(&proc.name).await;
} }
@ -201,38 +205,54 @@ async fn run_daemons(
} }
// tokio::task::yield_now().await; // tokio::task::yield_now().await;
} }
} }
// 4ever sync
fn load_processes(json_filename: &str) -> Processes{ fn load_processes(json_filename: &str) -> Processes{
let read = fs::read_to_string(json_filename).expect(format!("Missing '{}' file. Cannot start runner", json_filename).as_str()); let read = fs::read_to_string(json_filename).expect(format!("Missing '{}' file. Cannot start runner", json_filename).as_str());
serde_json::from_str::<Processes>(&read).expect(format!("Parsing error in '{}' file. Cannot start runner", json_filename).as_str()) serde_json::from_str::<Processes>(&read).expect(format!("Parsing error in '{}' file. Cannot start runner", json_filename).as_str())
} }
fn get_pid(name: &str) -> Output { async fn get_pid(name: &str) -> Output {
Command::new("pidof") let name = Arc::new(name.to_string());
.arg(name) tokio::task::spawn_blocking(move || {
Command::new("pidof")
.arg(&*name)
.output() .output()
.expect("Failed to execute command 'pidof'") .expect("Failed to execute command 'pidof'")
})
.await
.unwrap()
} }
fn is_active(name: &str)-> bool { async fn is_active(name: &str)-> bool {
let output = Command::new("pidof") let arc_name = Arc::new(name.to_string());
.arg(name) tokio::task::spawn_blocking(move || {
let output = Command::new("pidof")
.arg(&*arc_name)
.output() .output()
.expect("Failed to execute command 'pidof'"); .expect("Failed to execute command 'pidof'");
!String::from_utf8_lossy(&output.stdout).trim().is_empty() !String::from_utf8_lossy(&output.stdout).trim().is_empty()
})
.await
.unwrap()
} }
fn is_frozen(name: &str) -> bool { async fn is_frozen(name: &str) -> bool {
let temp = get_pid(name); let temp = get_pid(name).await;
let pid = String::from_utf8_lossy(&temp.stdout); let pid = String::from_utf8_lossy(&temp.stdout);
if pid.trim().is_empty(){ let pid = pid.trim();
let arc_pid = Arc::new(pid.to_string());
if pid.is_empty(){
return false; return false;
} else { } else {
let cmd = Command::new("ps") tokio::task::spawn_blocking(move || {
.args(["-o", "stat=", "-p", pid.trim()]) let cmd = Command::new("ps")
.output() .args(["-o", "stat=", "-p", &arc_pid])
.expect("Failed to execute ps command"); .output()
return !(String::from_utf8_lossy(&cmd.stdout) == "Sl+\n"); .expect("Failed to execute ps command");
!(String::from_utf8_lossy(&cmd.stdout) == "Sl+\n")
})
.await
.unwrap()
} }
} }
async fn terminate_process (name: &str) { async fn terminate_process (name: &str) {
@ -272,7 +292,7 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
} }
} }
// check process status daemon // check process status daemon
async fn running_handler(prc: &TrackingProcess, tx: Arc<mpsc::Sender<u8>>){ async fn running_handler(prc: Arc<TrackingProcess>, tx: Arc<mpsc::Sender<u8>>){
// println!("running daemon on {}", prc.name); // println!("running daemon on {}", prc.name);
// services and files check (once) // services and files check (once)
let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone()); let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone());
@ -280,14 +300,14 @@ async fn running_handler(prc: &TrackingProcess, tx: Arc<mpsc::Sender<u8>>){
let res = join!(files_check, services_check); let res = join!(files_check, services_check);
// if inactive -> spawn checks -> active is true // if inactive -> spawn checks -> active is true
if !is_active(&prc.name) && res.0.is_ok() && res.1.is_ok(){ if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok(){
if start_process(&prc.name, &prc.path).await.is_err() { if start_process(&prc.name, &prc.path).await.is_err() {
tx.send(3).await.unwrap(); tx.send(3).await.unwrap();
return; return;
} }
} }
// if frozen -> spawn checks -> unfreeze is true // if frozen -> spawn checks -> unfreeze is true
else if is_frozen(&prc.name) && res.0.is_ok() && res.1.is_ok(){ else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok(){
tx.send(10).await.unwrap(); tx.send(10).await.unwrap();
return; return;
} }
@ -298,8 +318,8 @@ async fn running_handler(prc: &TrackingProcess, tx: Arc<mpsc::Sender<u8>>){
async fn file_handler(name: &str, files: &Vec<Files>, tx: Arc<mpsc::Sender<u8>>) -> Result<(), CustomError>{ async fn file_handler(name: &str, files: &Vec<Files>, tx: Arc<mpsc::Sender<u8>>) -> Result<(), CustomError>{
// println!("file daemon on {}", name); // println!("file daemon on {}", name);
for file in files { for file in files {
if check_file(&file.filename, &file.src).is_err() { if check_file(&file.filename, &file.src).await.is_err() {
if !is_active(name) || is_frozen(name) { if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
match file.triggers.on_delete.as_str() { match file.triggers.on_delete.as_str() {
@ -307,13 +327,13 @@ async fn file_handler(name: &str, files: &Vec<Files>, tx: Arc<mpsc::Sender<u8>>)
continue; continue;
}, },
"stop" => { "stop" => {
if is_active(name) { if is_active(name).await {
tx.send(1).await.unwrap(); tx.send(1).await.unwrap();
} }
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
}, },
"hold" => { "hold" => {
if is_active(name) { if is_active(name).await {
tx.send(2).await.unwrap(); tx.send(2).await.unwrap();
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
@ -330,21 +350,29 @@ async fn file_handler(name: &str, files: &Vec<Files>, tx: Arc<mpsc::Sender<u8>>)
tokio::task::yield_now().await; tokio::task::yield_now().await;
Ok(()) Ok(())
} }
fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let fileconcat = format!("{}{}", path, filename); let arc_name = Arc::new(filename.to_string());
let path = Path::new(&fileconcat); let arc_path = Arc::new(path.to_string());
if path.exists() { tokio::task::spawn_blocking(move || {
Ok(()) let fileconcat = format!("{}{}", arc_path, arc_name);
} else { let path = Path::new(&fileconcat);
Err(CustomError::Fatal) if path.exists() {
} Ok(())
} else {
Err(CustomError::Fatal)
}
})
.await
.unwrap_or_else(|_| {
panic!("Corrupted while file check process");
})
} }
async fn service_handler(name: &str, services: &Vec<Services>, tx: Arc<mpsc::Sender<u8>>) -> Result<(), CustomError> { async fn service_handler(name: &str, services: &Vec<Services>, tx: Arc<mpsc::Sender<u8>>) -> Result<(), CustomError> {
// println!("service daemon on {}", name); // println!("service daemon on {}", name);
for serv in services { for serv in services {
if check_service(&serv.hostname, &serv.port).await.is_err() { if check_service(&serv.hostname, &serv.port).await.is_err() {
if !is_active(name) || is_frozen(name) { if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name);
@ -359,7 +387,7 @@ async fn service_handler(name: &str, services: &Vec<Services>, tx: Arc<mpsc::Sen
} }
}, },
"hold" => { "hold" => {
if is_frozen(name) { if is_frozen(name).await {
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
if looped_service_connecting(name, serv).await.is_err() { if looped_service_connecting(name, serv).await.is_err() {
@ -391,7 +419,7 @@ async fn looped_service_connecting(
println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
match check_service(&serv.hostname, &serv.port).await { match check_service(&serv.hostname, &serv.port).await {
Ok(_) => { Ok(_) => {
println!("SUCCESS!"); println!("Successfully connected to {} from {} process!", &serv.hostname, &name);
break; break;
}, },
Err(_) => { Err(_) => {