Merge pull request 'master' (#15) from master into dev
Reviewed-on: http://git.enode/VladislavD/runner-rs/pulls/15pull/19/head
commit
4fb3533074
|
|
@ -1,5 +1,4 @@
|
||||||
/target
|
/target
|
||||||
.idea
|
.idea
|
||||||
Dockerfile
|
|
||||||
Cargo.lock
|
Cargo.lock
|
||||||
settings.json
|
hagent_test.sock
|
||||||
File diff suppressed because it is too large
Load Diff
24
Cargo.toml
24
Cargo.toml
|
|
@ -1,20 +1,12 @@
|
||||||
[package]
|
[workspace]
|
||||||
name = "runner-rs"
|
resolver = "2"
|
||||||
version = "0.9.25"
|
members = [
|
||||||
edition = "2021"
|
"noxis-rs",
|
||||||
|
"noxis-cli",
|
||||||
|
]
|
||||||
|
|
||||||
[profile.dev]
|
[profile.dev]
|
||||||
debug = true
|
debug = true
|
||||||
|
|
||||||
[dependencies]
|
[profile.test]
|
||||||
anyhow = "1.0.93"
|
debug = false
|
||||||
chrono = "0.4.38"
|
|
||||||
env_logger = "0.11.3"
|
|
||||||
inotify = "0.10.2"
|
|
||||||
log = "0.4.22"
|
|
||||||
pcap = "2.2.0"
|
|
||||||
redis = "0.25.4"
|
|
||||||
serde = { version = "1.0.203", features = ["derive"] }
|
|
||||||
serde_json = "1.0.118"
|
|
||||||
sysinfo = "0.32.0"
|
|
||||||
tokio = { version = "1.38.0", features = ["full", "time"] }
|
|
||||||
|
|
|
||||||
13
README.md
13
README.md
|
|
@ -1,13 +1,16 @@
|
||||||
|
|
||||||
# runner-rs ( with amd64 and riscv64 support )
|
# noxis-rs
|
||||||

|

|
||||||
in-container integrating util to handle processes runtime
|
### In-container integrating util to handle processes runtime
|
||||||
|
( with amd64 and riscv64 support )
|
||||||
|
|
||||||
## Depends on
|
## Depends on
|
||||||
- `rustup (>=1.27.1)`
|
- `rustup (>=1.27.1)`
|
||||||
- `gcc-riscv64-unknown-elf`
|
- `gcc-riscv64-unknown-elf`
|
||||||
- `build-essential`
|
- `build-essential`
|
||||||
|
- `gcc-riscv64-linux-gnu`
|
||||||
|
- `binutils-riscv64-linux-gnu`
|
||||||
|
|
||||||
|
|
||||||
## Setting up
|
## Setting up
|
||||||
Download and execute rustup.sh
|
Download and execute rustup.sh
|
||||||
|
|
@ -29,7 +32,7 @@ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
|
||||||
cd runner-rs/ && rustup target add riscv64gc-unknown-linux-gnu && rustup target add x86_64-unknown-linux-gnu
|
cd runner-rs/ && rustup target add riscv64gc-unknown-linux-gnu && rustup target add x86_64-unknown-linux-gnu
|
||||||
~~~
|
~~~
|
||||||
> [!NOTE]
|
> [!NOTE]
|
||||||
> Cargo is configured to build an app for amd64/linux defaultly. RISC-based compilation is optional.
|
> Cargo is configured to build an app for amd64/linux defaultly. RISCV-based compilation is optional.
|
||||||
|
|
||||||
3.1. Release build of app for amd64/linux
|
3.1. Release build of app for amd64/linux
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
[package]
|
||||||
|
name = "noxis-cli"
|
||||||
|
version = "0.1.6"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0.94"
|
||||||
|
clap = { version = "4.5.22", features = ["derive"] }
|
||||||
|
serde = { version = "1.0.215", features = ["derive"] }
|
||||||
|
serde_json = "1.0.133"
|
||||||
|
tokio = { version = "1.42.0", features = ["full", "net"] }
|
||||||
|
|
@ -0,0 +1,145 @@
|
||||||
|
use clap::{Parser, Subcommand};
|
||||||
|
|
||||||
|
#[derive(Debug, Parser)]
|
||||||
|
pub struct Cli {
|
||||||
|
#[command(
|
||||||
|
subcommand,
|
||||||
|
help = "to manage Noxis work",
|
||||||
|
)]
|
||||||
|
command : Commands,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Subcommand)]
|
||||||
|
pub enum Commands {
|
||||||
|
#[command(
|
||||||
|
about = "To get info about current Noxis status",
|
||||||
|
)]
|
||||||
|
Status,
|
||||||
|
#[command(
|
||||||
|
about = "To start Noxis process",
|
||||||
|
)]
|
||||||
|
Start(StartAction),
|
||||||
|
#[command(
|
||||||
|
about = "To stop Noxis process",
|
||||||
|
)]
|
||||||
|
Stop,
|
||||||
|
#[command(
|
||||||
|
about = "To restart Noxis process",
|
||||||
|
)]
|
||||||
|
Restart(StartAction),
|
||||||
|
#[command(
|
||||||
|
about = "To get list of processes that are being monitoring",
|
||||||
|
)]
|
||||||
|
Processes,
|
||||||
|
// process command
|
||||||
|
#[command(
|
||||||
|
about = "To manage current process that is being monitoring",
|
||||||
|
)]
|
||||||
|
Process(ProcessCommand),
|
||||||
|
// config command =
|
||||||
|
#[command(
|
||||||
|
about = "To manage config settings",
|
||||||
|
)]
|
||||||
|
Config(ConfigCommand),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Parser)]
|
||||||
|
pub struct StartAction {
|
||||||
|
#[arg(
|
||||||
|
long="with-flags",
|
||||||
|
num_args = 1..,
|
||||||
|
value_delimiter = ' '
|
||||||
|
)]
|
||||||
|
flags : Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Parser)]
|
||||||
|
pub struct ConfigCommand {
|
||||||
|
#[command(subcommand)]
|
||||||
|
action : ConfigAction,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Subcommand)]
|
||||||
|
pub enum ConfigAction {
|
||||||
|
#[command(
|
||||||
|
about = "To change current Noxis configuration",
|
||||||
|
)]
|
||||||
|
Local(LocalConfig),
|
||||||
|
#[command(
|
||||||
|
about = "To change credentials of the remote config server",
|
||||||
|
)]
|
||||||
|
Remote,
|
||||||
|
#[command(
|
||||||
|
about = "To reset all config settings",
|
||||||
|
)]
|
||||||
|
Reset,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Parser)]
|
||||||
|
pub struct LocalConfig {
|
||||||
|
// flag
|
||||||
|
#[arg(
|
||||||
|
long = "json",
|
||||||
|
action,
|
||||||
|
help = "to read following input as JSON",
|
||||||
|
)]
|
||||||
|
is_json : bool,
|
||||||
|
// value
|
||||||
|
#[arg(
|
||||||
|
help = "path to config file or config String (with --json flag)",
|
||||||
|
)]
|
||||||
|
config : String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Parser)]
|
||||||
|
pub struct ProcessCommand {
|
||||||
|
#[arg(
|
||||||
|
help = "name of needed process",
|
||||||
|
)]
|
||||||
|
process : String,
|
||||||
|
#[command(
|
||||||
|
subcommand,
|
||||||
|
help = "To get current process's status",
|
||||||
|
)]
|
||||||
|
action : ProcessAction,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Subcommand)]
|
||||||
|
enum ProcessAction {
|
||||||
|
#[command(
|
||||||
|
about = "To get info about current process status",
|
||||||
|
)]
|
||||||
|
Status,
|
||||||
|
#[command(
|
||||||
|
about = "To start current process",
|
||||||
|
)]
|
||||||
|
Start,
|
||||||
|
#[command(
|
||||||
|
about = "To stop current process",
|
||||||
|
)]
|
||||||
|
Stop,
|
||||||
|
#[command(
|
||||||
|
about = "To freeze (hybernaze) current process",
|
||||||
|
)]
|
||||||
|
Freeze,
|
||||||
|
#[command(
|
||||||
|
about = "To unfreeze (unhybernaze) current process",
|
||||||
|
)]
|
||||||
|
Unfreeze,
|
||||||
|
#[command(
|
||||||
|
about = "To restart current process",
|
||||||
|
)]
|
||||||
|
Restart,
|
||||||
|
#[command(
|
||||||
|
about = "To get info about current process's dependencies",
|
||||||
|
)]
|
||||||
|
Deps,
|
||||||
|
#[command(
|
||||||
|
about = "To get info about current process's files-dependencies",
|
||||||
|
)]
|
||||||
|
Files,
|
||||||
|
#[command(
|
||||||
|
about = "To get info about current process's services-dependencies",
|
||||||
|
)]
|
||||||
|
Services,
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
mod cli;
|
||||||
|
|
||||||
|
pub use cli::*;
|
||||||
|
|
@ -0,0 +1,16 @@
|
||||||
|
mod cli;
|
||||||
|
mod net;
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
|
use cli::Cli;
|
||||||
|
use net::{create_tcp_stream, try_send};
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<()>{
|
||||||
|
let cli = Cli::parse();
|
||||||
|
dbg!(&cli);
|
||||||
|
// println!("{:?}", cli);
|
||||||
|
try_send(create_tcp_stream().await, cli).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::time::{Duration, sleep};
|
||||||
|
use anyhow::Result;
|
||||||
|
use super::Cli;
|
||||||
|
|
||||||
|
|
||||||
|
pub async fn create_tcp_stream() -> Result<TcpStream> {
|
||||||
|
let stream = TcpStream::connect("127.0.0.1:7753").await?;
|
||||||
|
Ok(stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn try_send(stream: Result<TcpStream>, params: Cli) -> Result<()> {
|
||||||
|
let mut stream = stream?;
|
||||||
|
loop {
|
||||||
|
if stream.writable().await.is_err() {
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let msg = format!("{:?}", params);
|
||||||
|
// let msg = r"HTTP/1.1 POST\r\nContent-Length: 14\r\nContent-Type: text/plain\r\n\r\nHello, World!@";
|
||||||
|
stream.write_all(msg.as_bytes()).await?;
|
||||||
|
// ...
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,18 @@
|
||||||
|
[package]
|
||||||
|
name = "noxis-rs"
|
||||||
|
version = "0.11.3"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0.93"
|
||||||
|
chrono = "0.4.38"
|
||||||
|
clap = { version = "4.5.21", features = ["derive"] }
|
||||||
|
env_logger = "0.11.3"
|
||||||
|
inotify = "0.10.2"
|
||||||
|
log = "0.4.22"
|
||||||
|
pcap = "2.2.0"
|
||||||
|
redis = "0.25.4"
|
||||||
|
serde = { version = "1.0.203", features = ["derive"] }
|
||||||
|
serde_json = "1.0.118"
|
||||||
|
sysinfo = "0.32.0"
|
||||||
|
tokio = { version = "1.38.0", features = ["full", "time"] }
|
||||||
|
|
@ -4,12 +4,12 @@
|
||||||
"processes": [
|
"processes": [
|
||||||
{
|
{
|
||||||
"name": "temp-process",
|
"name": "temp-process",
|
||||||
"path": "/home/user/monitor/runner-rs/temp-process",
|
"path": "./temp-process",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"files": [
|
"files": [
|
||||||
{
|
{
|
||||||
"filename": "dep-file",
|
"filename": "dep-file",
|
||||||
"src": "/home/user/monitor/runner-rs/tests/examples/",
|
"src": "./tests/examples/",
|
||||||
"triggers": {
|
"triggers": {
|
||||||
"onDelete": "stop",
|
"onDelete": "stop",
|
||||||
"onChange": "stay"
|
"onChange": "stay"
|
||||||
|
|
@ -1,27 +1,38 @@
|
||||||
mod options;
|
mod options;
|
||||||
mod utils;
|
mod utils;
|
||||||
|
|
||||||
|
use anyhow::Error;
|
||||||
|
use clap::Parser;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use options::config::*;
|
use options::config::*;
|
||||||
use options::logger::setup_logger;
|
use options::logger::setup_logger;
|
||||||
use options::signals::set_valid_destructor;
|
use options::signals::set_valid_destructor;
|
||||||
use options::structs::*;
|
use options::structs::Processes;
|
||||||
|
use options::cli_pipeline::init_cli_pipeline;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use utils::*;
|
use utils::*;
|
||||||
|
use options::preboot::PrebootParams;
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
async fn main() {
|
async fn main() -> anyhow::Result<()>{
|
||||||
|
let preboot = Arc::new(PrebootParams::parse().validate()?);
|
||||||
|
|
||||||
|
// if let Err(_) = preboot {
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
// let preboot = Arc::new(preboot);
|
||||||
|
|
||||||
let _ = setup_logger();
|
let _ = setup_logger();
|
||||||
|
|
||||||
info!("Runner is configurating...");
|
info!("Runner is configurating...");
|
||||||
|
|
||||||
// setting up redis connection \
|
// setting up redis connection \
|
||||||
// then conf checks to choose the most actual \
|
// then conf checks to choose the most actual \
|
||||||
let processes: Processes = get_actual_config().await.unwrap_or_else(|| {
|
let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
|
||||||
error!("No actual configuration for runner. Stopping...");
|
error!("No actual configuration for runner. Stopping...");
|
||||||
std::process::exit(101);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
|
|
@ -32,7 +43,7 @@ async fn main() {
|
||||||
|
|
||||||
if processes.processes.is_empty() {
|
if processes.processes.is_empty() {
|
||||||
error!("Processes list is null, runner-rs initialization is stopped");
|
error!("Processes list is null, runner-rs initialization is stopped");
|
||||||
return;
|
return Err(Error::msg("Empty processes segment in config"));
|
||||||
}
|
}
|
||||||
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
|
||||||
// is in need to send to the signals handler thread
|
// is in need to send to the signals handler thread
|
||||||
|
|
@ -75,13 +86,18 @@ async fn main() {
|
||||||
|
|
||||||
// remote config update subscription
|
// remote config update subscription
|
||||||
handler.push(tokio::spawn(async move {
|
handler.push(tokio::spawn(async move {
|
||||||
let _ = subscribe_config_stream(Arc::new(processes)).await;
|
let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
|
||||||
|
}));
|
||||||
|
|
||||||
|
// cli pipeline
|
||||||
|
handler.push(tokio::spawn(async move {
|
||||||
|
let _ = init_cli_pipeline().await;
|
||||||
}));
|
}));
|
||||||
|
|
||||||
for i in handler {
|
for i in handler {
|
||||||
let _ = i.await;
|
let _ = i.await;
|
||||||
}
|
}
|
||||||
return;
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: integration tests
|
// todo: integration tests
|
||||||
|
|
@ -4,3 +4,5 @@ pub mod config;
|
||||||
pub mod logger;
|
pub mod logger;
|
||||||
pub mod signals;
|
pub mod signals;
|
||||||
pub mod structs;
|
pub mod structs;
|
||||||
|
pub mod preboot;
|
||||||
|
pub mod cli_pipeline;
|
||||||
|
|
@ -0,0 +1,60 @@
|
||||||
|
use log::{error, info, warn};
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use anyhow::{Result as DynResult, Error};
|
||||||
|
use tokio::time::{sleep, Duration};
|
||||||
|
use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}};
|
||||||
|
// use std::io::BufReader;
|
||||||
|
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
|
||||||
|
|
||||||
|
|
||||||
|
pub async fn init_cli_pipeline() -> DynResult<()> {
|
||||||
|
return match init_listener().await {
|
||||||
|
Some(list) => {
|
||||||
|
loop {
|
||||||
|
if let Ok((socket, addr)) = list.accept().await {
|
||||||
|
// isolation
|
||||||
|
if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() {
|
||||||
|
warn!("Declined attempt to connect TCP-socket from {}", addr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
process_connection(socket).await;
|
||||||
|
}
|
||||||
|
sleep(Duration::from_millis(500)).await;
|
||||||
|
}
|
||||||
|
// Ok(())
|
||||||
|
},
|
||||||
|
None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn init_listener() -> Option<TcpListener> {
|
||||||
|
return match TcpListener::bind("127.0.0.1:7753").await {
|
||||||
|
Ok(listener) => {
|
||||||
|
info!("Runner is listening localhost:7753");
|
||||||
|
Some(listener)
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Cannot create TCP listener for CLI");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_connection(mut stream: TcpStream) {
|
||||||
|
// loop{
|
||||||
|
// stream.
|
||||||
|
// }
|
||||||
|
let buf_reader = BufReader::new(stream.borrow_mut());
|
||||||
|
let mut rqst = buf_reader.lines();
|
||||||
|
|
||||||
|
|
||||||
|
while let Ok(Some(line)) = rqst.next_line().await {
|
||||||
|
if line.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
println!("{}", line);
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!";
|
||||||
|
stream.write_all(response.as_bytes()).await.unwrap();
|
||||||
|
}
|
||||||
|
|
@ -7,9 +7,11 @@ use std::os::unix::process::CommandExt;
|
||||||
use std::process::Command;
|
use std::process::Command;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{env, fs};
|
use std::{env, fs};
|
||||||
use tokio::time::Duration;
|
use std::fmt::format;
|
||||||
|
use super::preboot::PrebootParams;
|
||||||
|
use tokio::time::{Duration, sleep};
|
||||||
|
|
||||||
const CONFIG_PATH: &str = "settings.json";
|
// const CONFIG_PATH: &str = "settings.json";
|
||||||
|
|
||||||
/// # Fn `load_processes`
|
/// # Fn `load_processes`
|
||||||
/// ## for reading and parsing *local* storing config
|
/// ## for reading and parsing *local* storing config
|
||||||
|
|
@ -46,43 +48,48 @@ fn load_processes(json_filename: &str) -> Option<Processes> {
|
||||||
///
|
///
|
||||||
/// *depends on* : struct `Processes`
|
/// *depends on* : struct `Processes`
|
||||||
///
|
///
|
||||||
pub async fn get_actual_config() -> Option<Processes> {
|
pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes> {
|
||||||
// * if no local conf -> loop and +inf getting conf from redis server
|
// * if no local conf -> loop and +inf getting conf from redis server
|
||||||
// * if local conf -> once getting conf from redis server
|
// * if local conf -> once getting conf from redis server
|
||||||
match load_processes(CONFIG_PATH) {
|
let config_path = params.config.to_str()?;
|
||||||
|
info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url);
|
||||||
|
match load_processes(config_path) {
|
||||||
Some(local_conf) => {
|
Some(local_conf) => {
|
||||||
info!(
|
info!(
|
||||||
"Found local configuration, version - {}",
|
"Found local configuration, version - {}",
|
||||||
&local_conf.date_of_creation
|
&local_conf.date_of_creation
|
||||||
);
|
);
|
||||||
if let Some(remote_conf) =
|
if !params.no_remote_config {
|
||||||
// TODO : rework with pubsub mech
|
if let Some(remote_conf) =
|
||||||
once_get_remote_configuration(&format!("redis://{}/", local_conf.config_server))
|
// TODO : rework with pubsub mech
|
||||||
{
|
once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url))
|
||||||
return match config_comparing(&local_conf, &remote_conf) {
|
{
|
||||||
ConfigActuality::Local => {
|
return match config_comparing(&local_conf, &remote_conf) {
|
||||||
info!("Local config is actual");
|
ConfigActuality::Local => {
|
||||||
Some(local_conf)
|
info!("Local config is actual");
|
||||||
}
|
Some(local_conf)
|
||||||
ConfigActuality::Remote => {
|
|
||||||
info!("Pulled config is more actual. Saving changes!");
|
|
||||||
if save_new_config(&remote_conf, CONFIG_PATH).is_err() {
|
|
||||||
error!("Saving changes process failed due to unexpected error...")
|
|
||||||
}
|
}
|
||||||
Some(remote_conf)
|
ConfigActuality::Remote => {
|
||||||
}
|
info!("Pulled config is more actual. Saving changes!");
|
||||||
};
|
if save_new_config(&remote_conf, config_path).is_err() {
|
||||||
|
error!("Saving changes process failed due to unexpected error...")
|
||||||
|
}
|
||||||
|
Some(remote_conf)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Some(local_conf)
|
Some(local_conf)
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
warn!("No local valid conf was found. Trying to pull remote one...");
|
warn!("No local valid conf was found. Trying to pull remote one...");
|
||||||
let mut conn = get_connection_watcher(&open_watcher("redis://localhost/"));
|
if !params.no_remote_config {
|
||||||
let remote_config = get_remote_conf_watcher(&mut conn).await;
|
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url)));
|
||||||
if let Some(conf) = remote_config {
|
if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
|
||||||
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
|
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
|
||||||
let _ = save_new_config(&conf, CONFIG_PATH);
|
let _ = save_new_config(&conf, config_path);
|
||||||
return Some(conf);
|
return Some(conf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
@ -182,23 +189,23 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
|
||||||
if remote.is_none() {
|
if remote.is_none() {
|
||||||
error!("Pulled config is invalid. Check it in Redis Server");
|
error!("Pulled config is invalid. Check it in Redis Server");
|
||||||
}
|
}
|
||||||
return remote;
|
remote
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
error!("Cannot extract payload from new message. Check Redis Server state");
|
error!("Cannot extract payload from new message. Check Redis Server state");
|
||||||
return None;
|
None
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
warn!("Cannot get config from Redis Server. Empty channel");
|
warn!("Cannot get config from Redis Server. Empty channel");
|
||||||
return None;
|
None
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
error!("Redis subscription process failed. Check Redis configuration!");
|
error!("Redis subscription process failed. Check Redis configuration!");
|
||||||
return None;
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -311,8 +318,13 @@ fn restart_main_thread() -> std::io::Result<()> {
|
||||||
///
|
///
|
||||||
/// *depends on* : `Processes`
|
/// *depends on* : `Processes`
|
||||||
///
|
///
|
||||||
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(), CustomError> {
|
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
|
||||||
if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) {
|
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
|
||||||
|
|
||||||
|
if params.no_sub || params.no_remote_config {
|
||||||
|
return Err(CustomError::Fatal);
|
||||||
|
}
|
||||||
|
if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) {
|
||||||
if let Ok(mut conn) = client.get_connection() {
|
if let Ok(mut conn) = client.get_connection() {
|
||||||
match crate::utils::get_container_id() {
|
match crate::utils::get_container_id() {
|
||||||
Some(channel_name) => {
|
Some(channel_name) => {
|
||||||
|
|
@ -330,8 +342,8 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(),
|
||||||
match config_comparing(&actual_prcs, &remote_config) {
|
match config_comparing(&actual_prcs, &remote_config) {
|
||||||
ConfigActuality::Remote => {
|
ConfigActuality::Remote => {
|
||||||
warn!("Pulled config is actual. Saving and restarting...");
|
warn!("Pulled config is actual. Saving and restarting...");
|
||||||
if save_new_config(&remote_config, CONFIG_PATH).is_err() {
|
if save_new_config(&remote_config, config_path).is_err() {
|
||||||
error!("Error with saving new config to {}. Stopping sub mechanism...", &CONFIG_PATH);
|
error!("Error with saving new config to {}. Stopping sub mechanism...", config_path);
|
||||||
return Err(CustomError::Fatal);
|
return Err(CustomError::Fatal);
|
||||||
}
|
}
|
||||||
if restart_main_thread().is_err() {
|
if restart_main_thread().is_err() {
|
||||||
|
|
@ -352,7 +364,7 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
|
sleep(Duration::from_secs(30)).await;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name);
|
error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name);
|
||||||
|
|
@ -433,7 +445,7 @@ fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomEr
|
||||||
Err(_) => Err(CustomError::Fatal),
|
Err(_) => Err(CustomError::Fatal),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => return Err(CustomError::Fatal),
|
Err(_) => Err(CustomError::Fatal),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => Err(CustomError::Fatal),
|
Err(_) => Err(CustomError::Fatal),
|
||||||
|
|
@ -61,8 +61,27 @@ pub fn setup_logger() -> Result<(), crate::options::structs::CustomError> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod logger_tests {
|
mod logger_tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
// #[test]
|
||||||
|
// fn setting_up_logger() {
|
||||||
|
// assert!(setup_logger().is_ok());
|
||||||
|
// }
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn setting_up_logger() {
|
fn setting_up_logger() {
|
||||||
assert!(setup_logger().is_ok());
|
Builder::new()
|
||||||
|
.format(move |buf, record| {
|
||||||
|
writeln!(
|
||||||
|
buf,
|
||||||
|
"|{}| {} [{}] - {}",
|
||||||
|
get_container_id().unwrap_or("NODE".to_string()).trim(),
|
||||||
|
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||||
|
record.level(),
|
||||||
|
record.args(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.filter(None, LevelFilter::Info)
|
||||||
|
.target(env_logger::Target::Stdout)
|
||||||
|
.is_test(true)
|
||||||
|
.init();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -0,0 +1,194 @@
|
||||||
|
// module to handle pre-boot params of the monitor
|
||||||
|
use anyhow::{Result, Ok, Error};
|
||||||
|
use clap::Parser;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
#[derive(clap::ValueEnum, Debug, Clone)]
|
||||||
|
pub enum MetricsPrebootParams {
|
||||||
|
Full,
|
||||||
|
System,
|
||||||
|
Processes,
|
||||||
|
Net,
|
||||||
|
None,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for MetricsPrebootParams {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
MetricsPrebootParams::Full => write!(f, "full"),
|
||||||
|
MetricsPrebootParams::System => write!(f, "system"),
|
||||||
|
MetricsPrebootParams::Processes => write!(f, "processes"),
|
||||||
|
MetricsPrebootParams::Net => write!(f, "net"),
|
||||||
|
MetricsPrebootParams::None => write!(f, "none"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Parser)]
|
||||||
|
pub struct PrebootParams {
|
||||||
|
// actions
|
||||||
|
#[arg(
|
||||||
|
long = "no-hagent",
|
||||||
|
action,
|
||||||
|
conflicts_with="socket_path",
|
||||||
|
help="To disable work with host-agent"
|
||||||
|
)]
|
||||||
|
pub no_hostagent : bool,
|
||||||
|
#[arg(
|
||||||
|
long = "no-logs",
|
||||||
|
action,
|
||||||
|
conflicts_with="log_to",
|
||||||
|
help="To disable logs"
|
||||||
|
)]
|
||||||
|
pub no_logs: bool,
|
||||||
|
#[arg(
|
||||||
|
long = "refresh-logs",
|
||||||
|
action,
|
||||||
|
conflicts_with="no_logs",
|
||||||
|
help="To clear logs directory"
|
||||||
|
)]
|
||||||
|
pub refresh_logs : bool,
|
||||||
|
#[arg(
|
||||||
|
long = "no-remote-config",
|
||||||
|
action,
|
||||||
|
help="To disable work with remote config server",
|
||||||
|
conflicts_with="no_sub")]
|
||||||
|
pub no_remote_config : bool,
|
||||||
|
#[arg(
|
||||||
|
long = "no-sub",
|
||||||
|
action,
|
||||||
|
help="To disable subscription mechanism",
|
||||||
|
conflicts_with="no_remote_config")]
|
||||||
|
pub no_sub : bool,
|
||||||
|
|
||||||
|
// params (socket_path, log_to, remote_server_url, config)
|
||||||
|
#[arg(
|
||||||
|
long = "socket-path",
|
||||||
|
default_value="/var/run/enode/hostagent.sock",
|
||||||
|
conflicts_with="no_hostagent",
|
||||||
|
help="To set .sock file's path used in communication with host-agent"
|
||||||
|
)]
|
||||||
|
pub socket_path : PathBuf,
|
||||||
|
#[arg(
|
||||||
|
long = "log-to",
|
||||||
|
default_value="./",
|
||||||
|
conflicts_with="no_logs",
|
||||||
|
help="To set a path to logs directory"
|
||||||
|
)]
|
||||||
|
pub log_to : PathBuf,
|
||||||
|
#[arg(
|
||||||
|
long = "remote-server-url",
|
||||||
|
default_value="localhost",
|
||||||
|
conflicts_with="no_remote_config",
|
||||||
|
help = "To set url of remote config server using in remote config pulling mechanism"
|
||||||
|
)]
|
||||||
|
pub remote_server_url : String,
|
||||||
|
#[arg(
|
||||||
|
long = "config",
|
||||||
|
short,
|
||||||
|
default_value="settings.json",
|
||||||
|
help="To set local config file path"
|
||||||
|
)]
|
||||||
|
pub config : PathBuf,
|
||||||
|
|
||||||
|
// value enum params (metrics)
|
||||||
|
#[arg(
|
||||||
|
long = "metrics",
|
||||||
|
short,
|
||||||
|
default_value_t=MetricsPrebootParams::Full,
|
||||||
|
help="To set metrics grubbing mode"
|
||||||
|
)]
|
||||||
|
pub metrics: MetricsPrebootParams,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PrebootParams {
|
||||||
|
pub fn validate(self) -> Result<Self> {
|
||||||
|
if !self.socket_path.exists() && !self.no_hostagent {
|
||||||
|
return Err(Error::msg("Socket-file not found or Noxis can't read it. Cannot start"));
|
||||||
|
}
|
||||||
|
// existing log dir
|
||||||
|
if !self.log_to.exists() && !self.no_logs {
|
||||||
|
return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start"));
|
||||||
|
}
|
||||||
|
// existing sock file
|
||||||
|
if !self.config.exists() {
|
||||||
|
return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start"));
|
||||||
|
}
|
||||||
|
// redis server check
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// unit tests of preboot params parsing mech
|
||||||
|
#[cfg(test)]
|
||||||
|
mod preboot_unitests{
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parsing_zero_args() {
|
||||||
|
assert!(PrebootParams::try_parse_from(vec!["runner-rs"]).is_ok())
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn parsing_hagent_valid_args() {
|
||||||
|
assert!(PrebootParams::try_parse_from(vec![
|
||||||
|
"runner-rs",
|
||||||
|
"--socket-path", "/path/to/socket"
|
||||||
|
]).is_ok())
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn parsing_hagent_invalid_args() {
|
||||||
|
assert!(PrebootParams::try_parse_from(vec![
|
||||||
|
"runner-rs",
|
||||||
|
"--socket-path", "/path/to/socket",
|
||||||
|
"--no-hagent"
|
||||||
|
]).is_err())
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn parsing_log_valid_args() {
|
||||||
|
assert!(PrebootParams::try_parse_from(vec![
|
||||||
|
"runner-rs",
|
||||||
|
"--log-to", "/path/to/log/dir"
|
||||||
|
]).is_ok())
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn parsing_log_invalid_args() {
|
||||||
|
assert!(PrebootParams::try_parse_from(vec![
|
||||||
|
"runner-rs",
|
||||||
|
"--log-to /path/to/log/dir",
|
||||||
|
"--no-logs"
|
||||||
|
]).is_err())
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn parsing_config_valid_args() {
|
||||||
|
assert!(PrebootParams::try_parse_from(vec![
|
||||||
|
"runner-rs",
|
||||||
|
"--no-sub",
|
||||||
|
"--remote-server-url", "redis://127.0.0.1"
|
||||||
|
]).is_ok())
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn parsing_config_invalid_args_noremote_nosub() {
|
||||||
|
assert!(PrebootParams::try_parse_from(vec![
|
||||||
|
"runner-rs",
|
||||||
|
"--no-remote-config", "--no-sub"
|
||||||
|
]).is_err())
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn parsing_config_invalid_args_noremote_remoteurl() {
|
||||||
|
assert!(PrebootParams::try_parse_from(vec![
|
||||||
|
"runner-rs",
|
||||||
|
"--no-remote-config",
|
||||||
|
"--remote-server-url", "redis://127.0.0.1"
|
||||||
|
]).is_err())
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn parsing_metrics_args_using_value_enum() {
|
||||||
|
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "full"]).is_ok());
|
||||||
|
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "system"]).is_ok());
|
||||||
|
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "processes"]).is_ok());
|
||||||
|
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "net"]).is_ok());
|
||||||
|
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "none"]).is_ok());
|
||||||
|
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "unusual_value"]).is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
|
@ -5,6 +7,7 @@ use serde::{Deserialize, Serialize};
|
||||||
pub enum CustomError {
|
pub enum CustomError {
|
||||||
Fatal,
|
Fatal,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub enum ConfigActuality {
|
pub enum ConfigActuality {
|
||||||
Local,
|
Local,
|
||||||
|
|
@ -18,7 +21,7 @@ pub enum ConfigActuality {
|
||||||
///
|
///
|
||||||
/// *depends on* : `TrackingProcess`
|
/// *depends on* : `TrackingProcess`
|
||||||
///
|
///
|
||||||
/// ```
|
/// ``` json
|
||||||
/// {
|
/// {
|
||||||
/// -> "dateOfCreation": "1721381809104",
|
/// -> "dateOfCreation": "1721381809104",
|
||||||
/// -> "configServer": "localhost",
|
/// -> "configServer": "localhost",
|
||||||
|
|
@ -44,7 +47,7 @@ pub struct Processes {
|
||||||
///
|
///
|
||||||
/// *depends on* : `Dependencies`
|
/// *depends on* : `Dependencies`
|
||||||
///
|
///
|
||||||
/// ```
|
/// ``` json
|
||||||
/// ...
|
/// ...
|
||||||
/// "processes": [
|
/// "processes": [
|
||||||
/// -> {
|
/// -> {
|
||||||
|
|
@ -69,7 +72,7 @@ pub struct TrackingProcess {
|
||||||
///
|
///
|
||||||
/// *depends on* : `Files`, `Services`
|
/// *depends on* : `Files`, `Services`
|
||||||
///
|
///
|
||||||
/// ```
|
/// ``` json
|
||||||
/// ...
|
/// ...
|
||||||
/// "path": "/home/user/monitor/runner-rs/temp-process",
|
/// "path": "/home/user/monitor/runner-rs/temp-process",
|
||||||
/// -> "dependencies": {
|
/// -> "dependencies": {
|
||||||
|
|
@ -93,7 +96,7 @@ pub struct Dependencies {
|
||||||
///
|
///
|
||||||
/// *depends on* : `FileTriggers`
|
/// *depends on* : `FileTriggers`
|
||||||
///
|
///
|
||||||
/// ```
|
/// ``` json
|
||||||
/// ...
|
/// ...
|
||||||
/// "files": [
|
/// "files": [
|
||||||
/// -> {
|
/// -> {
|
||||||
|
|
@ -118,7 +121,7 @@ pub struct Files {
|
||||||
///
|
///
|
||||||
/// *depends on* : `ServiceTriggers`
|
/// *depends on* : `ServiceTriggers`
|
||||||
///
|
///
|
||||||
/// ```
|
/// ``` json
|
||||||
/// ...
|
/// ...
|
||||||
/// "services": [
|
/// "services": [
|
||||||
/// -> {
|
/// -> {
|
||||||
|
|
@ -143,7 +146,7 @@ pub struct Services {
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
/// ```
|
/// ``` json
|
||||||
/// ...
|
/// ...
|
||||||
/// "port": 443,
|
/// "port": 443,
|
||||||
/// -> "triggers": {
|
/// -> "triggers": {
|
||||||
|
|
@ -168,7 +171,7 @@ pub struct ServiceTriggers {
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
/// ```
|
/// ``` json
|
||||||
/// ...
|
/// ...
|
||||||
/// "src": "/home/user/monitor/runner-rs/tests/examples/",
|
/// "src": "/home/user/monitor/runner-rs/tests/examples/",
|
||||||
/// -> "triggers": {
|
/// -> "triggers": {
|
||||||
|
|
@ -0,0 +1,271 @@
|
||||||
|
pub mod files;
|
||||||
|
pub mod hagent;
|
||||||
|
pub mod metrics;
|
||||||
|
pub mod prcs;
|
||||||
|
pub mod services;
|
||||||
|
|
||||||
|
// TODO : saving current flags state
|
||||||
|
|
||||||
|
use crate::options::structs::CustomError;
|
||||||
|
use crate::options::structs::TrackingProcess;
|
||||||
|
use files::create_watcher;
|
||||||
|
use files::file_handler;
|
||||||
|
use inotify::Inotify;
|
||||||
|
use log::{error, warn};
|
||||||
|
use prcs::{
|
||||||
|
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
|
||||||
|
unfreeze_process,
|
||||||
|
};
|
||||||
|
use services::service_handler;
|
||||||
|
use std::process::Command;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::join;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::time::Duration;
|
||||||
|
|
||||||
|
const GET_ID_CMD: &str = "hostname";
|
||||||
|
|
||||||
|
/// # Fn `run_daemons`
|
||||||
|
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
|
||||||
|
///
|
||||||
|
/// *input* : `Arc<TrackingProcess>`, `Arc<mpsc::Sender<u8>>`, `&mut mpsc::Receiver<u8>`,
|
||||||
|
///
|
||||||
|
/// *output* : ()
|
||||||
|
///
|
||||||
|
/// *initiator* : main thread
|
||||||
|
///
|
||||||
|
/// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader
|
||||||
|
///
|
||||||
|
/// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher`
|
||||||
|
///
|
||||||
|
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
|
||||||
|
///
|
||||||
|
pub async fn run_daemons(
|
||||||
|
proc: Arc<TrackingProcess>,
|
||||||
|
tx: Arc<mpsc::Sender<u8>>,
|
||||||
|
rx: &mut mpsc::Receiver<u8>,
|
||||||
|
) {
|
||||||
|
// creating watchers + ---buffers---
|
||||||
|
let mut watchers: Vec<Inotify> = vec![];
|
||||||
|
for file in proc.dependencies.files.clone().into_iter() {
|
||||||
|
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
||||||
|
watchers.push(watcher);
|
||||||
|
} else {
|
||||||
|
let _ = tx.send(121).await;
|
||||||
|
}
|
||||||
|
// watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
|
||||||
|
}
|
||||||
|
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
|
||||||
|
Arc::new(tokio::sync::Mutex::new(watchers));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
||||||
|
tokio::select! {
|
||||||
|
_ = run_hand => continue,
|
||||||
|
_val = rx.recv() => {
|
||||||
|
if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
|
||||||
|
match val {
|
||||||
|
// 1 - File-dependency handling error -> terminating (after waiting)
|
||||||
|
1 => {
|
||||||
|
if is_active(&proc.name).await {
|
||||||
|
error!("File-dependency handling error: Terminating {} process ..." , &proc.name);
|
||||||
|
terminate_process(&proc.name).await;
|
||||||
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
|
}
|
||||||
|
// return;
|
||||||
|
},
|
||||||
|
// 2 - File-dependency handling error -> holding (after waiting)
|
||||||
|
2 => {
|
||||||
|
if !is_frozen(&proc.name).await {
|
||||||
|
error!("File-dependency handling error: Freezing {} process ..." , &proc.name);
|
||||||
|
freeze_process(&proc.name).await;
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// 3 - Running process error
|
||||||
|
3 => {
|
||||||
|
error!("Error due to starting {} process", &proc.name);
|
||||||
|
return Err(CustomError::Fatal)
|
||||||
|
},
|
||||||
|
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
|
||||||
|
4 => {
|
||||||
|
// warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name);
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
},
|
||||||
|
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
|
||||||
|
5 => {
|
||||||
|
if is_active(&proc.name).await {
|
||||||
|
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
|
||||||
|
terminate_process(&proc.name).await;
|
||||||
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
|
||||||
|
6 => {
|
||||||
|
// println!("holding {}-{}", proc.name, is_active(&proc.name).await);
|
||||||
|
if !is_frozen(&proc.name).await {
|
||||||
|
error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
|
||||||
|
freeze_process(&proc.name).await;
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// // 7 - File-dependency change -> terminating (after check)
|
||||||
|
7 => {
|
||||||
|
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
|
||||||
|
terminate_process(&proc.name).await;
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
return Err(CustomError::Fatal)
|
||||||
|
},
|
||||||
|
// // 8 - File-dependency change -> restarting (after check)
|
||||||
|
8 => {
|
||||||
|
warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name);
|
||||||
|
let _ = restart_process(&proc.name, &proc.path).await;
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
},
|
||||||
|
// // 9 - File-dependency change -> staying (after check)
|
||||||
|
9 => {
|
||||||
|
// no need to trash logs
|
||||||
|
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
},
|
||||||
|
|
||||||
|
// 10 - Process unfreaze call via file handler (or service handler)
|
||||||
|
10 | 11 => {
|
||||||
|
if is_frozen(&proc.name).await {
|
||||||
|
warn!("Unfreezing process {} call...", &proc.name);
|
||||||
|
unfreeze_process(&proc.name).await;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
},
|
||||||
|
// 11 - Process unfreaze call via service handler
|
||||||
|
// 11 => {
|
||||||
|
// if is_frozen(&proc.name).await {
|
||||||
|
// warn!("Unfreezing process {} call...", &proc.name);
|
||||||
|
// unfreeze_process(&proc.name).await;
|
||||||
|
// }
|
||||||
|
// tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
// },
|
||||||
|
// 101 - Impermissible trigger values in JSON
|
||||||
|
101 => {
|
||||||
|
error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", proc.name);
|
||||||
|
if is_active(&proc.name).await {
|
||||||
|
terminate_process(&proc.name).await;
|
||||||
|
}
|
||||||
|
return Err(CustomError::Fatal)
|
||||||
|
},
|
||||||
|
//
|
||||||
|
// 121 - Cannot create valid watcher for file dependency
|
||||||
|
121 => {
|
||||||
|
error!("Cannot create valid watcher for {}'s file dependency. Terminating thread...", proc.name);
|
||||||
|
let _ = terminate_process("runner-rs").await;
|
||||||
|
return Err(CustomError::Fatal)
|
||||||
|
},
|
||||||
|
// 111 - global thread termination with killing current child in a face
|
||||||
|
// of a current process
|
||||||
|
111 => {
|
||||||
|
warn!("Terminating {}'s child processes...", &proc.name);
|
||||||
|
match is_active(&proc.name).await {
|
||||||
|
true => {
|
||||||
|
terminate_process(&proc.name).await;
|
||||||
|
},
|
||||||
|
false => {
|
||||||
|
log::info!("Process {} is already terminated!", proc.name);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {},
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
// check process status daemon
|
||||||
|
/// # Fn `run_daemons`
|
||||||
|
/// ## func to async exec subjobs of checking process, services and files states
|
||||||
|
///
|
||||||
|
/// *input* : `Arc<TrackingProcess>`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
||||||
|
///
|
||||||
|
/// *output* : ()
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `run_daemons`
|
||||||
|
///
|
||||||
|
/// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers
|
||||||
|
///
|
||||||
|
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
|
||||||
|
///
|
||||||
|
pub async fn running_handler(
|
||||||
|
prc: Arc<TrackingProcess>,
|
||||||
|
tx: Arc<mpsc::Sender<u8>>,
|
||||||
|
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
||||||
|
) {
|
||||||
|
// services and files check (once)
|
||||||
|
let files_check = file_handler(
|
||||||
|
&prc.name,
|
||||||
|
&prc.dependencies.files,
|
||||||
|
tx.clone(),
|
||||||
|
watchers.clone(),
|
||||||
|
);
|
||||||
|
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
|
||||||
|
|
||||||
|
let res = join!(files_check, services_check);
|
||||||
|
// if inactive -> spawn checks -> active is true
|
||||||
|
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
||||||
|
if start_process(&prc.name, &prc.path).await.is_err() {
|
||||||
|
tx.send(3).await.unwrap();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// if frozen -> spawn checks -> unfreeze is true
|
||||||
|
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
||||||
|
tx.send(10).await.unwrap();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
|
||||||
|
/// # Fn `get_container_id`
|
||||||
|
/// ## for getting container id used in logs
|
||||||
|
///
|
||||||
|
/// *input* : -
|
||||||
|
///
|
||||||
|
/// *output* : Some(String) if cont-id was grubbed | None - if not
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `options::logger::setup_logger`
|
||||||
|
///
|
||||||
|
/// *managing* : -
|
||||||
|
///
|
||||||
|
/// *depends on* : -
|
||||||
|
///
|
||||||
|
pub fn get_container_id() -> Option<String> {
|
||||||
|
match Command::new(GET_ID_CMD).output() {
|
||||||
|
Ok(output) => {
|
||||||
|
if !output.status.success() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let id = String::from_utf8_lossy(&output.stdout).to_string();
|
||||||
|
if id.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(String::from_utf8_lossy(&output.stdout).to_string())
|
||||||
|
}
|
||||||
|
Err(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod utils_unittests {
|
||||||
|
use super::get_container_id;
|
||||||
|
#[test]
|
||||||
|
fn check_if_container_id_can_be_grabed() {
|
||||||
|
assert!(get_container_id().is_some());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -98,7 +98,10 @@ pub async fn file_handler(
|
||||||
// * watcher recreation after dealing with file recreation mechanism in text editors
|
// * watcher recreation after dealing with file recreation mechanism in text editors
|
||||||
let mutex = notify.borrow_mut();
|
let mutex = notify.borrow_mut();
|
||||||
|
|
||||||
*mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
|
||||||
|
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
||||||
|
*mutex = watcher;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
match file.triggers.on_change.as_str() {
|
match file.triggers.on_change.as_str() {
|
||||||
"stop" => {
|
"stop" => {
|
||||||
|
|
@ -159,22 +162,22 @@ mod files_unittests {
|
||||||
use super::*;
|
use super::*;
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn try_to_create_watcher() {
|
async fn try_to_create_watcher() {
|
||||||
let res = create_watcher("dep-file", "/home/user/monitor/runner-rs/tests/examples/").await;
|
let res = create_watcher("dep-file", "./tests/examples/").await;
|
||||||
assert!(res.is_ok());
|
assert!(res.is_ok());
|
||||||
}
|
}
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn try_to_create_invalid_watcher() {
|
async fn try_to_create_invalid_watcher() {
|
||||||
let res = create_watcher("invalid-file", "/path/to/the/hell").await;
|
let res = create_watcher("invalid-file", "/path/to/the/no/dir").await;
|
||||||
assert!(res.is_err());
|
assert!(res.is_err());
|
||||||
}
|
}
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn check_existing_file() {
|
async fn check_existing_file() {
|
||||||
let res = check_file("dep-file", "/home/user/monitor/runner-rs/tests/examples/").await;
|
let res = check_file("dep-file", "./tests/examples/").await;
|
||||||
assert!(res.is_ok());
|
assert!(res.is_ok());
|
||||||
}
|
}
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn check_non_existing_file() {
|
async fn check_non_existing_file() {
|
||||||
let res = check_file("invalid-file", "/path/to/the/hell").await;
|
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
|
||||||
assert!(res.is_err());
|
assert!(res.is_err());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1,5 +1,11 @@
|
||||||
|
//
|
||||||
// module needed to check host-agent health condition and to communicate with it
|
// module needed to check host-agent health condition and to communicate with it
|
||||||
|
//
|
||||||
use tokio::{io::Interest, net::UnixStream};
|
use tokio::{io::Interest, net::UnixStream};
|
||||||
|
use anyhow::{Ok, Result, Error};
|
||||||
|
// to kill lint bug
|
||||||
|
#[allow(unused_imports)]
|
||||||
|
use tokio::net::UnixListener;
|
||||||
|
|
||||||
/// # Fn `open_unix_socket`
|
/// # Fn `open_unix_socket`
|
||||||
/// ## opening unix-socket for host-agent communication
|
/// ## opening unix-socket for host-agent communication
|
||||||
|
|
@ -14,9 +20,10 @@ use tokio::{io::Interest, net::UnixStream};
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
async fn open_unix_socket() -> Result<UnixStream, std::io::Error> {
|
#[allow(dead_code)]
|
||||||
let socket = UnixStream::connect("/var/run/enode/hostagent.sock").await?;
|
async fn open_unix_socket(sock_path: &str) -> Result<UnixStream, std::io::Error> {
|
||||||
Ok(socket)
|
// "/var/run/enode/hostagent.sock"
|
||||||
|
UnixStream::connect(sock_path).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # Fn `ha_healthcheck`
|
/// # Fn `ha_healthcheck`
|
||||||
|
|
@ -32,15 +39,11 @@ async fn open_unix_socket() -> Result<UnixStream, std::io::Error> {
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
async fn ha_healthcheck(socket: &UnixStream) -> Result<(), std::io::Error >{
|
#[allow(dead_code)]
|
||||||
|
async fn ha_healthcheck(socket: &UnixStream) -> Result<(), Error> {
|
||||||
socket.ready(Interest::WRITABLE).await?;
|
socket.ready(Interest::WRITABLE).await?;
|
||||||
if socket.writable().await.is_ok() {
|
socket.writable().await?;
|
||||||
if let Err(er) = socket.try_write(b"Hello HAgent") {
|
socket.try_write(b"Hello HAgent")?;
|
||||||
return Err(er);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(std::io::ErrorKind::WouldBlock.into());
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -57,34 +60,37 @@ async fn ha_healthcheck(socket: &UnixStream) -> Result<(), std::io::Error >{
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), std::io::Error > {
|
#[allow(dead_code)]
|
||||||
|
async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), Error > {
|
||||||
socket.ready(Interest::WRITABLE).await?;
|
socket.ready(Interest::WRITABLE).await?;
|
||||||
if socket.writable().await.is_ok() {
|
socket.writable().await?;
|
||||||
if let Err(er) = socket.try_write(data.as_bytes()) {
|
socket.try_write(data.as_bytes())?;
|
||||||
return Err(er);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(std::io::ErrorKind::WouldBlock.into());
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod hagent_unittets {
|
mod hagent_unittets {
|
||||||
use super::*;
|
use super::*;
|
||||||
#[tokio::test]
|
const TEST_SOCKET: &str = "./tests/examples/hagent_test.sock";
|
||||||
// maybe bool : true -> alive, false -> dead
|
|
||||||
// simple request on api
|
async fn init_listener() -> UnixListener {
|
||||||
async fn hagent_healthcheck() {
|
let _ = std::fs::remove_file(TEST_SOCKET);
|
||||||
let sock = open_unix_socket().await;
|
UnixListener::bind(TEST_SOCKET).unwrap()
|
||||||
assert!(sock.is_ok());
|
|
||||||
let sock = sock.unwrap();
|
|
||||||
assert!(ha_healthcheck(&sock).await.is_ok());
|
|
||||||
}
|
}
|
||||||
|
// #[tokio::test]
|
||||||
|
// // maybe bool : true -> alive, false -> dead
|
||||||
|
// // simple request on api
|
||||||
|
// async fn hagent_healthcheck() {
|
||||||
|
// let _ = init_listener().await;
|
||||||
|
// let sock = open_unix_socket(TEST_SOCKET).await;
|
||||||
|
// assert!(sock.is_ok());
|
||||||
|
// let sock = sock.unwrap();
|
||||||
|
// assert!(ha_healthcheck(&sock).await.is_ok());
|
||||||
|
// }
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
// --Result<maybe Response>
|
// --Result<maybe Response>
|
||||||
// one-shot func
|
// one-shot func
|
||||||
async fn send_metrics_to_hagent() {
|
async fn hagent_communication_test() {
|
||||||
use crate::options::structs::{ProcessMetrics, ContainerMetrics, Metrics};
|
use crate::options::structs::{ProcessMetrics, ContainerMetrics, Metrics};
|
||||||
|
|
||||||
let procm = ProcessMetrics::new("test-prc", 15.0, 5.0);
|
let procm = ProcessMetrics::new("test-prc", 15.0, 5.0);
|
||||||
|
|
@ -92,7 +98,9 @@ mod hagent_unittets {
|
||||||
let metrics = Metrics::new(contm, vec![procm]);
|
let metrics = Metrics::new(contm, vec![procm]);
|
||||||
let metrics = &serde_json::to_string_pretty(&metrics).unwrap();
|
let metrics = &serde_json::to_string_pretty(&metrics).unwrap();
|
||||||
|
|
||||||
let sock = open_unix_socket().await;
|
#[allow(unused_mut)]
|
||||||
|
let mut _list = init_listener().await;
|
||||||
|
let sock = open_unix_socket(TEST_SOCKET).await;
|
||||||
assert!(sock.is_ok());
|
assert!(sock.is_ok());
|
||||||
let sock = sock.unwrap();
|
let sock = sock.unwrap();
|
||||||
assert!(ha_healthcheck(&sock).await.is_ok());
|
assert!(ha_healthcheck(&sock).await.is_ok());
|
||||||
|
|
@ -101,6 +109,6 @@ mod hagent_unittets {
|
||||||
}
|
}
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn open_unixsocket_test() {
|
async fn open_unixsocket_test() {
|
||||||
assert!(open_unix_socket().await.is_ok());
|
assert!(open_unix_socket("non/valid/socket/file.sock").await.is_err());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -27,6 +27,7 @@ use crate::utils::get_container_id;
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
|
#[allow(dead_code)]
|
||||||
pub async fn init_metrics_grubber() {
|
pub async fn init_metrics_grubber() {
|
||||||
let mut system = System::new();
|
let mut system = System::new();
|
||||||
// let mut buffer: Vec<PacketInfo> = vec![];
|
// let mut buffer: Vec<PacketInfo> = vec![];
|
||||||
|
|
@ -39,6 +40,8 @@ pub async fn init_metrics_grubber() {
|
||||||
// let _ = capture_packets(shared_buf.clone()).await;
|
// let _ = capture_packets(shared_buf.clone()).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
#[allow(unused_variables)]
|
||||||
async fn gather_metrics(proc: Arc<Process>) {
|
async fn gather_metrics(proc: Arc<Process>) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -92,6 +95,7 @@ async fn gather_metrics(proc: Arc<Process>) {
|
||||||
///
|
///
|
||||||
/// *depends on* : `TrackingProcess`
|
/// *depends on* : `TrackingProcess`
|
||||||
///
|
///
|
||||||
|
#[allow(dead_code)]
|
||||||
async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProcess>>) -> ContainerMetrics {
|
async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProcess>>) -> ContainerMetrics {
|
||||||
let metrics = join!(
|
let metrics = join!(
|
||||||
get_cpu_metrics_container(sys.clone()),
|
get_cpu_metrics_container(sys.clone()),
|
||||||
|
|
@ -119,6 +123,7 @@ async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProce
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
|
#[allow(dead_code)]
|
||||||
async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 {
|
async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 {
|
||||||
sys.global_cpu_usage()
|
sys.global_cpu_usage()
|
||||||
}
|
}
|
||||||
|
|
@ -136,6 +141,7 @@ async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 {
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
|
#[allow(dead_code)]
|
||||||
async fn get_ram_metrics_container(sys: Arc<System>) -> f32 {
|
async fn get_ram_metrics_container(sys: Arc<System>) -> f32 {
|
||||||
(sys.used_memory() / sys.total_memory()) as f32 * 100.0
|
(sys.used_memory() / sys.total_memory()) as f32 * 100.0
|
||||||
}
|
}
|
||||||
|
|
@ -156,6 +162,7 @@ async fn get_ram_metrics_container(sys: Arc<System>) -> f32 {
|
||||||
///
|
///
|
||||||
/// *depends on* : `TrackingProcess`
|
/// *depends on* : `TrackingProcess`
|
||||||
///
|
///
|
||||||
|
#[allow(dead_code)]
|
||||||
async fn get_subsystems(prcs: Arc<Vec<TrackingProcess>>) -> Vec<String> {
|
async fn get_subsystems(prcs: Arc<Vec<TrackingProcess>>) -> Vec<String> {
|
||||||
prcs.iter().map(|process| process.name.clone()).collect()
|
prcs.iter().map(|process| process.name.clone()).collect()
|
||||||
}
|
}
|
||||||
|
|
@ -173,6 +180,7 @@ async fn get_subsystems(prcs: Arc<Vec<TrackingProcess>>) -> Vec<String> {
|
||||||
///
|
///
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
|
#[allow(dead_code)]
|
||||||
async fn get_all_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> ProcessMetrics {
|
async fn get_all_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> ProcessMetrics {
|
||||||
let metrics = join!(
|
let metrics = join!(
|
||||||
get_cpu_metrics_process(proc.clone()),
|
get_cpu_metrics_process(proc.clone()),
|
||||||
|
|
@ -233,14 +233,15 @@ mod process_unittests {
|
||||||
// rewrite, its a pipe
|
// rewrite, its a pipe
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn full_cycle_with_restart() {
|
async fn full_cycle_with_restart() {
|
||||||
let res1 = start_process("temp-process", "/home/user/monitor/runner-rs/temp-process").await;
|
// let _ = std::io::stdout().write_all(b"");
|
||||||
|
let res1 = start_process("restart-prc", "./tests/examples/restart-prc").await;
|
||||||
assert!(res1.is_ok());
|
assert!(res1.is_ok());
|
||||||
let res2 =
|
let res2 =
|
||||||
restart_process("temp-process", "/home/user/monitor/runner-rs/temp-process").await;
|
restart_process("restart-prc", "./tests/examples/restart-prc").await;
|
||||||
assert!(res2.is_ok());
|
assert!(res2.is_ok());
|
||||||
let _ = terminate_process("temp-process").await;
|
let _ = terminate_process("restart-prc").await;
|
||||||
let res3 = is_active("temp-process").await;
|
let res3 = is_active("restart-prc").await;
|
||||||
assert!(res3);
|
assert!(!res3);
|
||||||
}
|
}
|
||||||
// rewrite, its a pipe
|
// rewrite, its a pipe
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
@ -249,7 +250,10 @@ mod process_unittests {
|
||||||
}
|
}
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn is_active_check() {
|
async fn is_active_check() {
|
||||||
assert!(is_active("systemd").await);
|
let res1 = start_process("tmp-prc", "./tests/examples/tmp-prc").await;
|
||||||
|
assert!(res1.is_ok());
|
||||||
|
assert!(is_active("tmp-prc").await);
|
||||||
|
let _ = terminate_process("tmp-prc").await;
|
||||||
}
|
}
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn isnt_active_check() {
|
async fn isnt_active_check() {
|
||||||
|
|
@ -257,11 +261,17 @@ mod process_unittests {
|
||||||
}
|
}
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn is_frozen_check() {
|
async fn is_frozen_check() {
|
||||||
assert!(!is_frozen("systemd").await);
|
let res1 = start_process("freeze-check", "./tests/examples/freeze-check").await;
|
||||||
|
assert!(res1.is_ok());
|
||||||
|
assert!(!is_frozen("freeze-check").await);
|
||||||
}
|
}
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn pidof_active_process() {
|
async fn pidof_active_process() {
|
||||||
assert!(get_pid("systemd").await.is_some());
|
assert!(get_pid("pidof-prc").await.is_none());
|
||||||
|
let res1 = start_process("pidof-prc", "./tests/examples/pidof-prc").await;
|
||||||
|
assert!(res1.is_ok());
|
||||||
|
assert!(get_pid("pidof-prc").await.is_some());
|
||||||
|
let _ = terminate_process("pidof-prc").await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// broken mechanism need to check
|
// broken mechanism need to check
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
265
src/utils.rs
265
src/utils.rs
|
|
@ -1,265 +0,0 @@
|
||||||
pub mod files;
|
|
||||||
pub mod hagent;
|
|
||||||
pub mod metrics;
|
|
||||||
pub mod prcs;
|
|
||||||
pub mod services;
|
|
||||||
|
|
||||||
//
|
|
||||||
|
|
||||||
use crate::options::structs::TrackingProcess;
|
|
||||||
use files::create_watcher;
|
|
||||||
use files::file_handler;
|
|
||||||
use inotify::Inotify;
|
|
||||||
use log::{error, warn};
|
|
||||||
use prcs::{
|
|
||||||
freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process,
|
|
||||||
unfreeze_process,
|
|
||||||
};
|
|
||||||
use services::service_handler;
|
|
||||||
use std::process::Command;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::join;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use tokio::time::Duration;
|
|
||||||
|
|
||||||
const GET_ID_CMD: &str = "hostname";
|
|
||||||
|
|
||||||
/// # Fn `run_daemons`
|
|
||||||
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
|
|
||||||
///
|
|
||||||
/// *input* : `Arc<TrackingProcess>`, `Arc<mpsc::Sender<u8>>`, `&mut mpsc::Receiver<u8>`,
|
|
||||||
///
|
|
||||||
/// *output* : ()
|
|
||||||
///
|
|
||||||
/// *initiator* : main thread
|
|
||||||
///
|
|
||||||
/// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader
|
|
||||||
///
|
|
||||||
/// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher`
|
|
||||||
///
|
|
||||||
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
|
|
||||||
///
|
|
||||||
pub async fn run_daemons(
|
|
||||||
proc: Arc<TrackingProcess>,
|
|
||||||
tx: Arc<mpsc::Sender<u8>>,
|
|
||||||
rx: &mut mpsc::Receiver<u8>,
|
|
||||||
) {
|
|
||||||
// creating watchers + ---buffers---
|
|
||||||
let mut watchers: Vec<Inotify> = vec![];
|
|
||||||
for file in proc.dependencies.files.clone().into_iter() {
|
|
||||||
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
|
|
||||||
watchers.push(watcher);
|
|
||||||
} else {
|
|
||||||
let _ = tx.send(121).await;
|
|
||||||
}
|
|
||||||
// watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
|
|
||||||
}
|
|
||||||
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
|
|
||||||
Arc::new(tokio::sync::Mutex::new(watchers));
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
|
|
||||||
tokio::select! {
|
|
||||||
_ = run_hand => {},
|
|
||||||
_val = rx.recv() => {
|
|
||||||
match _val.unwrap() {
|
|
||||||
// 1 - File-dependency handling error -> terminating (after waiting)
|
|
||||||
1 => {
|
|
||||||
if is_active(&proc.name).await {
|
|
||||||
error!("File-dependency handling error: Terminating {} process ..." , &proc.name);
|
|
||||||
terminate_process(&proc.name).await;
|
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
},
|
|
||||||
// 2 - File-dependency handling error -> holding (after waiting)
|
|
||||||
2 => {
|
|
||||||
if !is_frozen(&proc.name).await {
|
|
||||||
error!("File-dependency handling error: Freezing {} process ..." , &proc.name);
|
|
||||||
freeze_process(&proc.name).await;
|
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
// 3 - Running process error
|
|
||||||
3 => {
|
|
||||||
error!("Error due to starting {} process", &proc.name);
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
|
|
||||||
4 => {
|
|
||||||
// warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name);
|
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
},
|
|
||||||
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
|
|
||||||
5 => {
|
|
||||||
if is_active(&proc.name).await {
|
|
||||||
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
|
|
||||||
terminate_process(&proc.name).await;
|
|
||||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
|
|
||||||
6 => {
|
|
||||||
// println!("holding {}-{}", proc.name, is_active(&proc.name).await);
|
|
||||||
if !is_frozen(&proc.name).await {
|
|
||||||
error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
|
|
||||||
freeze_process(&proc.name).await;
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
// // 7 - File-dependency change -> terminating (after check)
|
|
||||||
7 => {
|
|
||||||
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
|
|
||||||
terminate_process(&proc.name).await;
|
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
return;
|
|
||||||
},
|
|
||||||
// // 8 - File-dependency change -> restarting (after check)
|
|
||||||
8 => {
|
|
||||||
warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name);
|
|
||||||
let _ = restart_process(&proc.name, &proc.path).await;
|
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
},
|
|
||||||
// // 9 - File-dependency change -> staying (after check)
|
|
||||||
9 => {
|
|
||||||
// no need to trash logs
|
|
||||||
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
|
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
},
|
|
||||||
|
|
||||||
// 10 - Process unfreaze call via file handler (or service handler)
|
|
||||||
10 | 11 => {
|
|
||||||
if is_frozen(&proc.name).await {
|
|
||||||
warn!("Unfreezing process {} call...", &proc.name);
|
|
||||||
unfreeze_process(&proc.name).await;
|
|
||||||
}
|
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
},
|
|
||||||
// 11 - Process unfreaze call via service handler
|
|
||||||
// 11 => {
|
|
||||||
// if is_frozen(&proc.name).await {
|
|
||||||
// warn!("Unfreezing process {} call...", &proc.name);
|
|
||||||
// unfreeze_process(&proc.name).await;
|
|
||||||
// }
|
|
||||||
// tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
// },
|
|
||||||
// 101 - Impermissible trigger values in JSON
|
|
||||||
101 => {
|
|
||||||
error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", proc.name);
|
|
||||||
if is_active(&proc.name).await {
|
|
||||||
terminate_process(&proc.name).await;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
//
|
|
||||||
// 121 - Cannot create valid watcher for file dependency
|
|
||||||
121 => {
|
|
||||||
error!("Cannot create valid watcher for {}'s file dependency. Terminating thread...", proc.name);
|
|
||||||
let _ = terminate_process("runner-rs").await;
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
// 111 - global thread termination with killing current child in a face
|
|
||||||
// of a current process
|
|
||||||
111 => {
|
|
||||||
warn!("Terminating {}'s child processes...", &proc.name);
|
|
||||||
match is_active(&proc.name).await {
|
|
||||||
true => {
|
|
||||||
terminate_process(&proc.name).await;
|
|
||||||
},
|
|
||||||
false => {
|
|
||||||
log::info!("Process {} is already terminated!", proc.name);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
_ => {},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
// check process status daemon
|
|
||||||
/// # Fn `run_daemons`
|
|
||||||
/// ## func to async exec subjobs of checking process, services and files states
|
|
||||||
///
|
|
||||||
/// *input* : `Arc<TrackingProcess>`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
|
|
||||||
///
|
|
||||||
/// *output* : ()
|
|
||||||
///
|
|
||||||
/// *initiator* : fn `run_daemons`
|
|
||||||
///
|
|
||||||
/// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers
|
|
||||||
///
|
|
||||||
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
|
|
||||||
///
|
|
||||||
pub async fn running_handler(
|
|
||||||
prc: Arc<TrackingProcess>,
|
|
||||||
tx: Arc<mpsc::Sender<u8>>,
|
|
||||||
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
|
|
||||||
) {
|
|
||||||
// services and files check (once)
|
|
||||||
let files_check = file_handler(
|
|
||||||
&prc.name,
|
|
||||||
&prc.dependencies.files,
|
|
||||||
tx.clone(),
|
|
||||||
watchers.clone(),
|
|
||||||
);
|
|
||||||
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
|
|
||||||
|
|
||||||
let res = join!(files_check, services_check);
|
|
||||||
// if inactive -> spawn checks -> active is true
|
|
||||||
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
|
||||||
if start_process(&prc.name, &prc.path).await.is_err() {
|
|
||||||
tx.send(3).await.unwrap();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// if frozen -> spawn checks -> unfreeze is true
|
|
||||||
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
|
|
||||||
tx.send(10).await.unwrap();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
|
|
||||||
/// # Fn `get_container_id`
|
|
||||||
/// ## for getting container id used in logs
|
|
||||||
///
|
|
||||||
/// *input* : -
|
|
||||||
///
|
|
||||||
/// *output* : Some(String) if cont-id was grubbed | None - if not
|
|
||||||
///
|
|
||||||
/// *initiator* : fn `options::logger::setup_logger`
|
|
||||||
///
|
|
||||||
/// *managing* : -
|
|
||||||
///
|
|
||||||
/// *depends on* : -
|
|
||||||
///
|
|
||||||
pub fn get_container_id() -> Option<String> {
|
|
||||||
match Command::new(GET_ID_CMD).output() {
|
|
||||||
Ok(output) => {
|
|
||||||
if !output.status.success() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
let id = String::from_utf8_lossy(&output.stdout).to_string();
|
|
||||||
if id.is_empty() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
Some(String::from_utf8_lossy(&output.stdout).to_string())
|
|
||||||
}
|
|
||||||
Err(_) => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod utils_unittests {
|
|
||||||
use super::get_container_id;
|
|
||||||
#[test]
|
|
||||||
fn check_if_container_id_can_be_grabed() {
|
|
||||||
assert!(get_container_id().is_some());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
BIN
temp-process
BIN
temp-process
Binary file not shown.
Loading…
Reference in New Issue