Compare commits
No commits in common. "621f2d46b7f5bb786172d80ed4ae461ec0d7123a" and "8c71453f2930b1589fb0b930e8194e89176de0ea" have entirely different histories.
621f2d46b7
...
8c71453f29
|
|
@ -3,9 +3,6 @@ name = "runner-rs"
|
||||||
version = "0.5.5"
|
version = "0.5.5"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[profile.dev]
|
|
||||||
debug = true
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
chrono = "0.4.38"
|
chrono = "0.4.38"
|
||||||
env_logger = "0.11.3"
|
env_logger = "0.11.3"
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
{
|
{
|
||||||
"dateOfCreation": "1721381809101",
|
"dateOfCreation": "1721381809090",
|
||||||
"processes": [
|
"processes": [
|
||||||
{
|
{
|
||||||
"name": "web-server",
|
"name": "web-server",
|
||||||
|
|
@ -55,7 +55,7 @@
|
||||||
"hostname": "google.com",
|
"hostname": "google.com",
|
||||||
"port": 443,
|
"port": 443,
|
||||||
"triggers": {
|
"triggers": {
|
||||||
"wait": 5,
|
"wait": 14,
|
||||||
"delay": 1,
|
"delay": 1,
|
||||||
"onLost": "stop"
|
"onLost": "stop"
|
||||||
}
|
}
|
||||||
|
|
@ -64,7 +64,7 @@
|
||||||
"hostname": "localhost",
|
"hostname": "localhost",
|
||||||
"port": 8080,
|
"port": 8080,
|
||||||
"triggers": {
|
"triggers": {
|
||||||
"wait": 6,
|
"wait": 10,
|
||||||
"delay": 2,
|
"delay": 2,
|
||||||
"onLost": "hold"
|
"onLost": "hold"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,7 @@
|
||||||
use crate::structs::*;
|
use crate::structs::*;
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use redis::{Client, Commands, Connection, RedisResult};
|
use redis::{Client, Commands, Connection, RedisResult};
|
||||||
use std::{env, fs};
|
use std::fs;
|
||||||
use std::os::unix::process::CommandExt;
|
|
||||||
use std::process::Command;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
|
|
||||||
static CONFIG_PATH: &str = "settings.json";
|
static CONFIG_PATH: &str = "settings.json";
|
||||||
|
|
@ -25,7 +22,6 @@ pub fn get_actual_config() -> Option<Processes> {
|
||||||
// let mut local = load_processes(&CONFIG_PATH);
|
// let mut local = load_processes(&CONFIG_PATH);
|
||||||
match load_processes(CONFIG_PATH) {
|
match load_processes(CONFIG_PATH) {
|
||||||
Some(local_conf) => {
|
Some(local_conf) => {
|
||||||
info!("Found local configuration, version - {}", &local_conf.date_of_creation);
|
|
||||||
if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") {
|
if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") {
|
||||||
return match config_comparing(&local_conf, &remote_conf) {
|
return match config_comparing(&local_conf, &remote_conf) {
|
||||||
ConfigActuality::Local => {
|
ConfigActuality::Local => {
|
||||||
|
|
@ -79,10 +75,7 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
match parse_extern_config(&config[0].1[0].1) {
|
match parse_extern_config(&config[0].1[0].1) {
|
||||||
Some(prcs) => {
|
Some(prcs) => Some(prcs),
|
||||||
info!("Config {} was pulled from Redis-Server", &prcs.date_of_creation);
|
|
||||||
Some(prcs)
|
|
||||||
},
|
|
||||||
None => {
|
None => {
|
||||||
error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!");
|
error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!");
|
||||||
None
|
None
|
||||||
|
|
@ -158,9 +151,14 @@ fn get_stream_info_watcher(conn: &mut Connection) {
|
||||||
}
|
}
|
||||||
fn invalid_config_watcher(conn: &mut Connection) -> Processes {
|
fn invalid_config_watcher(conn: &mut Connection) -> Processes {
|
||||||
loop {
|
loop {
|
||||||
if let Some(prcs) = get_remote_config(conn) {
|
let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
|
||||||
info!("Got new config from Redis-Server, version - {}", &prcs.date_of_creation);
|
if res.is_ok() {
|
||||||
return prcs;
|
let config = &res.unwrap()[0];
|
||||||
|
if !config.is_empty() {
|
||||||
|
if let Some(conf) = parse_extern_config(&config[0].1[0].1) {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
error!("Got INVALID configuration. Update config! Retrying...");
|
error!("Got INVALID configuration. Update config! Retrying...");
|
||||||
std::thread::sleep(Duration::from_secs(4));
|
std::thread::sleep(Duration::from_secs(4));
|
||||||
|
|
@ -169,53 +167,6 @@ fn invalid_config_watcher(conn: &mut Connection) -> Processes {
|
||||||
|
|
||||||
// ! end of watchers
|
// ! end of watchers
|
||||||
|
|
||||||
fn get_remote_config(conn: &mut Connection) -> Option<Processes> {
|
|
||||||
let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
|
|
||||||
if res.is_ok() {
|
|
||||||
let config = &res.unwrap()[0];
|
|
||||||
if !config.is_empty() {
|
|
||||||
return parse_extern_config(&config[0].1[0].1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
fn restart_main_thread() -> std::io::Result<()>{
|
|
||||||
let current_exe = env::current_exe()?;
|
|
||||||
Command::new(current_exe)
|
|
||||||
.exec();
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(), CustomError> {
|
|
||||||
if let Ok(client) = Client::open("redis://localhost") {
|
|
||||||
if let Ok(mut conn) = client.get_connection() {
|
|
||||||
info!("Runner subscribed on config update");
|
|
||||||
loop {
|
|
||||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
||||||
if let Some(prcs) = get_remote_config(&mut conn) {
|
|
||||||
match config_comparing(&actual_prcs, &prcs) {
|
|
||||||
ConfigActuality::Remote => {
|
|
||||||
info!("New config was pulled. Saving and restarting...");
|
|
||||||
if save_new_config(&prcs, CONFIG_PATH).is_err() {
|
|
||||||
error!("Error with saving new config to {}", &CONFIG_PATH);
|
|
||||||
return Err(CustomError::Fatal)
|
|
||||||
}
|
|
||||||
if restart_main_thread().is_err() {
|
|
||||||
error!("Error with restarting Runner. Stopping sub mechanism...");
|
|
||||||
return Err(CustomError::Fatal)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ => continue,
|
|
||||||
}
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
error!("Error with subscribing Redis stream on update. Working only with selected config...");
|
|
||||||
Err(CustomError::Fatal)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
||||||
let local_date: u64 = local.date_of_creation.parse().unwrap();
|
let local_date: u64 = local.date_of_creation.parse().unwrap();
|
||||||
let remote_date: u64 = remote.date_of_creation.parse().unwrap();
|
let remote_date: u64 = remote.date_of_creation.parse().unwrap();
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,6 @@ pub async fn file_handler(
|
||||||
}
|
}
|
||||||
match file.triggers.on_delete.as_str() {
|
match file.triggers.on_delete.as_str() {
|
||||||
"stay" => {
|
"stay" => {
|
||||||
tx.send(9).await.unwrap();
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
"stop" => {
|
"stop" => {
|
||||||
|
|
|
||||||
27
src/main.rs
27
src/main.rs
|
|
@ -12,7 +12,6 @@ use log::{error, info};
|
||||||
use logger::setup_logger;
|
use logger::setup_logger;
|
||||||
use signals::set_valid_destructor;
|
use signals::set_valid_destructor;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
|
||||||
use structs::*;
|
use structs::*;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use utils::*;
|
use utils::*;
|
||||||
|
|
@ -21,7 +20,7 @@ use utils::*;
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let _ = setup_logger();
|
let _ = setup_logger();
|
||||||
|
|
||||||
info!("Runner is configurating...");
|
log::info!("Runner is configurating...");
|
||||||
|
|
||||||
// setting up redis connection \
|
// setting up redis connection \
|
||||||
// then conf checks to choose the most actual \
|
// then conf checks to choose the most actual \
|
||||||
|
|
@ -30,13 +29,13 @@ async fn main() {
|
||||||
std::process::exit(101);
|
std::process::exit(101);
|
||||||
});
|
});
|
||||||
|
|
||||||
info!(
|
log::info!(
|
||||||
"Current runner configuration: {}",
|
"Current runner configuration: {}",
|
||||||
&processes.date_of_creation
|
&processes.date_of_creation
|
||||||
);
|
);
|
||||||
info!("Runner is ready. Initializing...");
|
log::info!("Runner is ready. Initializing...");
|
||||||
|
|
||||||
if processes.processes.is_empty() {
|
if processes.processes.len() == 0 {
|
||||||
error!("Processes list is null, runner-rs initialization is stopped");
|
error!("Processes list is null, runner-rs initialization is stopped");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -45,7 +44,7 @@ async fn main() {
|
||||||
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
|
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
|
||||||
|
|
||||||
for proc in processes.processes.iter() {
|
for proc in processes.processes.iter() {
|
||||||
info!(
|
log::info!(
|
||||||
"Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
|
"Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
|
||||||
proc.name,
|
proc.name,
|
||||||
proc.path,
|
proc.path,
|
||||||
|
|
@ -66,28 +65,16 @@ async fn main() {
|
||||||
});
|
});
|
||||||
handler.push(event);
|
handler.push(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
// destructor addition
|
// destructor addition
|
||||||
handler.push(tokio::spawn(async move {
|
handler.push(tokio::spawn(async move {
|
||||||
if let Err(_) = set_valid_destructor(Arc::new(senders)).await {
|
if let Err(_) = set_valid_destructor(Arc::new(senders)).await {
|
||||||
error!("Linux signals handler creation failed. Terminating main thread...");
|
error!("Linux signals handler creation failed. Returning...");
|
||||||
return;
|
|
||||||
}
|
|
||||||
// todo: rework this temp construction, use async/await in signals mod
|
|
||||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
|
||||||
info!("End of job. Terminating main thread...");
|
|
||||||
std::process::exit(0);
|
|
||||||
}));
|
|
||||||
|
|
||||||
// remote config update subscription
|
|
||||||
handler.push(tokio::spawn(async move {
|
|
||||||
if let Err(_) = subscribe_config_stream(Arc::new(processes)).await {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
for i in handler {
|
for i in handler {
|
||||||
i.await.unwrap();
|
i.await.unwrap();
|
||||||
}
|
}
|
||||||
|
info!("End of job. Terminating main thread...");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -31,9 +31,9 @@ pub async fn service_handler(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"hold" => {
|
"hold" => {
|
||||||
// if is_frozen(name).await {
|
if is_frozen(name).await {
|
||||||
// return Err(CustomError::Fatal);
|
return Err(CustomError::Fatal);
|
||||||
// }
|
}
|
||||||
if looped_service_connecting(name, serv).await.is_err() {
|
if looped_service_connecting(name, serv).await.is_err() {
|
||||||
tx.send(6).await.unwrap();
|
tx.send(6).await.unwrap();
|
||||||
tokio::time::sleep(Duration::from_millis(400)).await;
|
tokio::time::sleep(Duration::from_millis(400)).await;
|
||||||
|
|
@ -53,7 +53,6 @@ pub async fn service_handler(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
|
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
|
||||||
let mut counter = 0;
|
|
||||||
if serv.triggers.wait == 0 {
|
if serv.triggers.wait == 0 {
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||||
|
|
@ -80,9 +79,8 @@ async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), Cu
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
||||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||||
counter += 1;
|
|
||||||
warn!(
|
warn!(
|
||||||
"{counter} Attempting to connect from {} process to {}:{}",
|
"Attempting to connect from {} process to {}:{}",
|
||||||
&name, &serv.hostname, &serv.port
|
&name, &serv.hostname, &serv.port
|
||||||
);
|
);
|
||||||
match check_service(&serv.hostname, &serv.port).await {
|
match check_service(&serv.hostname, &serv.port).await {
|
||||||
|
|
|
||||||
|
|
@ -95,7 +95,7 @@ pub async fn run_daemons(
|
||||||
},
|
},
|
||||||
// // 9 - File-dependency change -> staying (after check)
|
// // 9 - File-dependency change -> staying (after check)
|
||||||
9 => {
|
9 => {
|
||||||
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
|
warn!("File-dependency warning (file changed). Ignoring on {} process...", &proc.name);
|
||||||
},
|
},
|
||||||
|
|
||||||
// 10 - Process unfreaze call via file handler
|
// 10 - Process unfreaze call via file handler
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue