From b6810a8c524c784db5e1f9ea1fd01e2ac284702a Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 23 Jul 2024 16:17:05 +0300 Subject: [PATCH] redis conf sync update --- Cargo.lock | 2 +- Cargo.toml | 2 +- settings.json | 140 ++++++++++++++++++++------------------ src/main.rs | 182 ++++++++++++++++++++++++++++++++++---------------- 4 files changed, 201 insertions(+), 125 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5cfa34c..ed7a62e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -505,7 +505,7 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "runner-rs" -version = "0.4.5" +version = "0.4.9" dependencies = [ "chrono", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index f5b42a9..80b67c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "runner-rs" -version = "0.4.5" +version = "0.4.9" edition = "2021" [dependencies] diff --git a/settings.json b/settings.json index b13319a..094faf2 100644 --- a/settings.json +++ b/settings.json @@ -1,65 +1,77 @@ { - "id" : 1, - "processes" : [{ - "name" : "web-server", - "path" : "/home/vladislav/web/web-server", - "dependencies" : { - "files" : [{ - "filename" : "control-file", - "src" : "/home/vladislav/web/", - "triggers" : { - "onDelete" : "stop", - "onChange" : "restart" - } - }, - { - "filename" : "config-file", - "src" : "/home/vladislav/web/", - "triggers" : { - "onDelete" : "stop", - "onChange" : "restart" - } - }], - "services" : [{ - "hostname" : "ya.ru", - "port" : 443, - "triggers" : { - "wait" : 6, - "delay" : 1, - "onLost" : "stop" - } - }] - } - }, - { - "name" : "temp-process", - "path" : "/home/vladislav/web/temp-process", - "dependencies" : { - "files" : [{ - "filename" : "control-file", - "src" : "/home/vladislav/web/", - "triggers" : { - "onDelete" : "hold", - "onChange" : "restart" - } - }], - "services" : [{ - "hostname" : "google.com", - "port" : 443, - "triggers" : { - "wait" : 14, - "delay" : 1, - "onLost" : "stop" - } - }, { - "hostname" : "localhost", - "port" : 8080, - "triggers" : { - "wait" : 10, - "delay" : 2, - "onLost" : "hold" - } - }] - } - }] -} + "id": 1, + "dateOfCreation": "1721381809090", + "processes": [ + { + "name": "web-server", + "path": "/home/vladislav/web/web-server", + "dependencies": { + "files": [ + { + "filename": "control-file", + "src": "/home/vladislav/web/", + "triggers": { + "onDelete": "stop", + "onChange": "restart" + } + }, + { + "filename": "config-file", + "src": "/home/vladislav/web/", + "triggers": { + "onDelete": "stop", + "onChange": "restart" + } + } + ], + "services": [ + { + "hostname": "ya.ru", + "port": 443, + "triggers": { + "wait": 6, + "delay": 1, + "onLost": "stop" + } + } + ] + } + }, + { + "name": "temp-process", + "path": "/home/vladislav/web/temp-process", + "dependencies": { + "files": [ + { + "filename": "control-file", + "src": "/home/vladislav/web/", + "triggers": { + "onDelete": "hold", + "onChange": "restart" + } + } + ], + "services": [ + { + "hostname": "google.com", + "port": 443, + "triggers": { + "wait": 14, + "delay": 1, + "onLost": "stop" + } + }, + { + "hostname": "localhost", + "port": 8080, + "triggers": { + "wait": 10, + "delay": 2, + "onLost": "hold" + } + } + ] + } + } + ] +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index c0f62ea..e19c00e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,9 @@ -use redis::{Commands, ConnectionAddr}; +use redis::Commands; // json parsing use serde::{ Deserialize, Serialize }; use serde_json; // async multi-threaded execution -use tokio::time::{ Duration, Instant }; +use tokio::time::{ error, Duration, Instant }; use tokio::sync::mpsc; use tokio::join; // fatal errors handler @@ -19,10 +19,12 @@ use std::sync::Arc; use inotify::{ Inotify, WatchMask }; // logging use std::io::Write; -use chrono::{ Local, NaiveDate }; +use chrono::{Local, Utc}; use env_logger::Builder; use log::{error, warn, LevelFilter}; +static CONFIG_PATH : &'static str = "settings.json"; + /// # an Error enum (nextly will be deleted and replaced) enum CustomError { Fatal, @@ -34,6 +36,8 @@ enum CustomError { struct Processes { #[serde(rename="id")] runner_id: usize, + #[serde(rename="dateOfCreation")] + date_of_creation : String, #[serde(default)] processes : Vec, } @@ -98,6 +102,7 @@ struct FIleTriggers { #[tokio::main(flavor = "multi_thread")] async fn main() { + // building logger with current output format Builder::new() .format(|buf, record|{ writeln!(buf, @@ -110,30 +115,15 @@ async fn main() { .filter(None, LevelFilter::Info) .init(); - // OLD STYLE OF USING LOCAL CONFIG FILE - // let processes = load_processes("settings.json"); - // let mut error_counter = 0; + // setting up redis connection \ + // then conf checks to choose the most actual \ let client = redis::Client::open("redis://127.0.0.1").expect("error connecting redis server"); let mut conn = client.get_connection().expect("error getting connection"); - // let mut pubsub = conn.as_pubsub(); - // let _ = pubsub.subscribe("config"); - log::info!("Waitng for a config file in DB ..."); - match conn.xlen::<&str, usize>("config_stream") { - Ok(len) => { - if len > 0 { - } else { - - } - }, - Err(er) => { - error!("Cannot find needed stream to get configuration"); - return; - }, - } + log::info!("Waitng for a config file from DB ..."); if let Ok(len) = conn.xlen::<&str, usize>("config_stream") { if len <= 0 { - warn!("No configuration yet. Waiting ..."); + warn!("No configuration in DB yet. Waiting ..."); while conn.xlen::<&str, usize>("config_stream").unwrap() <= 0 { std::thread::sleep(Duration::from_millis(400)); } @@ -142,47 +132,113 @@ async fn main() { error!("Cannot find needed stream to get configuration"); return; } - // error - let conf: redis::RedisResult)>> = conn.xrevrange_count("config_stream", "+", "-", 1); + + let conf: redis::RedisResult)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); let mut config: Vec<(String, Vec<(String, String)>)> = Vec::new(); - println!("external config: {:?}", config); if conf.is_ok() { - config = conf.unwrap(); + // guarranted safe unwrapping + config = conf.unwrap()[0].clone(); + let idx = config[0].0.find('-').unwrap(); + let true_milis: i64 = config[0].0 + .chars() + .enumerate() + .filter(|(id, _)| *id < idx) + .map(|(_, chr)| chr) + .collect::() + .parse() + .unwrap(); + let date = chrono::DateTime::from_timestamp_millis(true_milis).unwrap(); + log::info!("Log from {} was given", date); } else { + // rewrite error handler println!("error: {}", conf.unwrap_err()); } - let processes: Option; + + let processes: Processes; if config.is_empty() { error!("No suitable configs were given!"); log::info!("Trying to get local config..."); - processes = load_processes("settings.json"); + // matching check + match load_processes(CONFIG_PATH) { + Some(prcs) => processes = prcs, + None => { + error!("No config for runner. Cannot continue running..."); + return; + }, + } } else { - println!("{:?}+++{}", std::time::SystemTime::now(), &config[1].0); - processes = parse_extern_config(&config[1].0); - } - // let processes: Processes = { - // let mut _temp = String::new(); - // loop { - // if let Ok(msg) = pubsub.get_message() { - // log::info!("Actual configuration is found !"); - // _temp = msg.get_payload::().expect("error getting payload"); - // break; - // } else { - // continue; - // }; - // } - // parse_extern_config(&_temp) - // }; + // matching check + match parse_extern_config(&config[0].1[0].1) { + Some(prcs) => { + // + // comparing and optionally rewriting config file + // + if let Some(local) = load_processes(CONFIG_PATH) { + // comparing + let local_date: u64 = local.date_of_creation.parse().unwrap(); + let external_date: u64 = prcs.date_of_creation.parse().unwrap(); + match local_date.cmp(&external_date) { + std::cmp::Ordering::Less => { + // server has sent newest version + warn!("Updating config (new version - {})...", &native_date_from_milis(&prcs.date_of_creation).unwrap()); + processes = prcs; + let _ = save_new_config(&processes, CONFIG_PATH); + }, + std::cmp::Ordering::Equal => { + // server has sent equal version + processes = prcs; + }, + std::cmp::Ordering::Greater => { + // server has sent an elder version + log::info!("Local config version is more actual"); + processes = local; + }, + } + } else { + // settings.json can be deleted + processes = prcs; + match std::fs::File::create(CONFIG_PATH) { + Ok(_) => { + // saving server conf (truncating or adding new) + let _ = save_new_config(&processes, CONFIG_PATH); + }, + Err(_) => { + error!("Cannot create or truncate local config file"); + }, + } - if let None = processes { - error!("no config (temp log)"); - return; - } - let processes = processes.unwrap(); + } + }, + None => { + //temp + warn!("External config file parsing failed. Trying to get local config..."); + if let Some(prcs) = load_processes(CONFIG_PATH) { + processes = prcs; + let date_of_creation: Result = processes.date_of_creation.parse(); + if date_of_creation.is_err() { + error!("Local config date of creation parsing failed. Returning..."); + return; + } else { + let date_of_creation = date_of_creation.unwrap(); + if let Some(date) = chrono::DateTime::from_timestamp_millis(date_of_creation) { + log::info!("Config from {} is actual now!", date); + } else { + error!("Local config date of creation parsing failed. Returning..."); + return; + } + } + } else { + error!("No config for runner. Cannot continue running..."); + return; + } + }, + } + } + + log::info!("Current runner configuration: {}\n", &processes.date_of_creation); if processes.processes.len() == 0 { error!("Processes list is null, runner-rs initialization is stopped"); - // eprintln!("Error: Processes list is null, runner-rs initialization is stopped"); return; } let mut handler: Vec> = vec![]; @@ -195,13 +251,6 @@ async fn main() { proc.dependencies.services.len() ); - // println!("Process '{}' on stage:\n{}\nDepends on {} file(s), {} service(s)\n", - // proc.name, - // proc.path, - // proc.dependencies.files.len(), - // proc.dependencies.services.len() - // ); - // creating msg channel // can or should be executed in new thread let (tx, mut rx) = mpsc::channel::(1); @@ -219,8 +268,23 @@ async fn main() { return; } -fn save_new_config(config: &str) -> Result<(), CustomError> { - Ok(()) +fn native_date_from_milis(mls: &str) -> Option> { + match mls.parse::(){ + Ok(val) => return chrono::DateTime::from_timestamp_millis(val), + Err(_) => return None, + } +} + +fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> { + match serde_json::to_string_pretty(&config) { + Ok(st) => { + match fs::write(config_file, st) { + Ok(_) => return Ok(()), + Err(_) => return Err(CustomError::Fatal), + } + }, + Err(_) => return Err(CustomError::Fatal), + } } async fn create_watcher(filename: &str, path: &str) -> Result {