redis conf sync update

pull/9/head
prplV 2024-07-23 16:17:05 +03:00
parent 212f2f2a24
commit b6810a8c52
4 changed files with 201 additions and 125 deletions

2
Cargo.lock generated
View File

@ -505,7 +505,7 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
[[package]] [[package]]
name = "runner-rs" name = "runner-rs"
version = "0.4.5" version = "0.4.9"
dependencies = [ dependencies = [
"chrono", "chrono",
"env_logger", "env_logger",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "runner-rs" name = "runner-rs"
version = "0.4.5" version = "0.4.9"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -1,10 +1,13 @@
{ {
"id": 1, "id": 1,
"processes" : [{ "dateOfCreation": "1721381809090",
"processes": [
{
"name": "web-server", "name": "web-server",
"path": "/home/vladislav/web/web-server", "path": "/home/vladislav/web/web-server",
"dependencies": { "dependencies": {
"files" : [{ "files": [
{
"filename": "control-file", "filename": "control-file",
"src": "/home/vladislav/web/", "src": "/home/vladislav/web/",
"triggers": { "triggers": {
@ -19,8 +22,10 @@
"onDelete": "stop", "onDelete": "stop",
"onChange": "restart" "onChange": "restart"
} }
}], }
"services" : [{ ],
"services": [
{
"hostname": "ya.ru", "hostname": "ya.ru",
"port": 443, "port": 443,
"triggers": { "triggers": {
@ -28,22 +33,26 @@
"delay": 1, "delay": 1,
"onLost": "stop" "onLost": "stop"
} }
}] }
]
} }
}, },
{ {
"name": "temp-process", "name": "temp-process",
"path": "/home/vladislav/web/temp-process", "path": "/home/vladislav/web/temp-process",
"dependencies": { "dependencies": {
"files" : [{ "files": [
{
"filename": "control-file", "filename": "control-file",
"src": "/home/vladislav/web/", "src": "/home/vladislav/web/",
"triggers": { "triggers": {
"onDelete": "hold", "onDelete": "hold",
"onChange": "restart" "onChange": "restart"
} }
}], }
"services" : [{ ],
"services": [
{
"hostname": "google.com", "hostname": "google.com",
"port": 443, "port": 443,
"triggers": { "triggers": {
@ -51,7 +60,8 @@
"delay": 1, "delay": 1,
"onLost": "stop" "onLost": "stop"
} }
}, { },
{
"hostname": "localhost", "hostname": "localhost",
"port": 8080, "port": 8080,
"triggers": { "triggers": {
@ -59,7 +69,9 @@
"delay": 2, "delay": 2,
"onLost": "hold" "onLost": "hold"
} }
}]
} }
}] ]
}
}
]
} }

View File

@ -1,9 +1,9 @@
use redis::{Commands, ConnectionAddr}; use redis::Commands;
// json parsing // json parsing
use serde::{ Deserialize, Serialize }; use serde::{ Deserialize, Serialize };
use serde_json; use serde_json;
// async multi-threaded execution // async multi-threaded execution
use tokio::time::{ Duration, Instant }; use tokio::time::{ error, Duration, Instant };
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::join; use tokio::join;
// fatal errors handler // fatal errors handler
@ -19,10 +19,12 @@ use std::sync::Arc;
use inotify::{ Inotify, WatchMask }; use inotify::{ Inotify, WatchMask };
// logging // logging
use std::io::Write; use std::io::Write;
use chrono::{ Local, NaiveDate }; use chrono::{Local, Utc};
use env_logger::Builder; use env_logger::Builder;
use log::{error, warn, LevelFilter}; use log::{error, warn, LevelFilter};
static CONFIG_PATH : &'static str = "settings.json";
/// # an Error enum (nextly will be deleted and replaced) /// # an Error enum (nextly will be deleted and replaced)
enum CustomError { enum CustomError {
Fatal, Fatal,
@ -34,6 +36,8 @@ enum CustomError {
struct Processes { struct Processes {
#[serde(rename="id")] #[serde(rename="id")]
runner_id: usize, runner_id: usize,
#[serde(rename="dateOfCreation")]
date_of_creation : String,
#[serde(default)] #[serde(default)]
processes : Vec<TrackingProcess>, processes : Vec<TrackingProcess>,
} }
@ -98,6 +102,7 @@ struct FIleTriggers {
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() { async fn main() {
// building logger with current output format
Builder::new() Builder::new()
.format(|buf, record|{ .format(|buf, record|{
writeln!(buf, writeln!(buf,
@ -110,30 +115,15 @@ async fn main() {
.filter(None, LevelFilter::Info) .filter(None, LevelFilter::Info)
.init(); .init();
// OLD STYLE OF USING LOCAL CONFIG FILE // setting up redis connection \
// let processes = load_processes("settings.json"); // then conf checks to choose the most actual \
// let mut error_counter = 0;
let client = redis::Client::open("redis://127.0.0.1").expect("error connecting redis server"); let client = redis::Client::open("redis://127.0.0.1").expect("error connecting redis server");
let mut conn = client.get_connection().expect("error getting connection"); let mut conn = client.get_connection().expect("error getting connection");
// let mut pubsub = conn.as_pubsub();
// let _ = pubsub.subscribe("config");
log::info!("Waitng for a config file in DB ...");
match conn.xlen::<&str, usize>("config_stream") {
Ok(len) => {
if len > 0 {
} else { log::info!("Waitng for a config file from DB ...");
}
},
Err(er) => {
error!("Cannot find needed stream to get configuration");
return;
},
}
if let Ok(len) = conn.xlen::<&str, usize>("config_stream") { if let Ok(len) = conn.xlen::<&str, usize>("config_stream") {
if len <= 0 { if len <= 0 {
warn!("No configuration yet. Waiting ..."); warn!("No configuration in DB yet. Waiting ...");
while conn.xlen::<&str, usize>("config_stream").unwrap() <= 0 { while conn.xlen::<&str, usize>("config_stream").unwrap() <= 0 {
std::thread::sleep(Duration::from_millis(400)); std::thread::sleep(Duration::from_millis(400));
} }
@ -142,47 +132,113 @@ async fn main() {
error!("Cannot find needed stream to get configuration"); error!("Cannot find needed stream to get configuration");
return; return;
} }
// error
let conf: redis::RedisResult<Vec<(String, Vec<(String, String)>)>> = conn.xrevrange_count("config_stream", "+", "-", 1); let conf: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>> = conn.xrevrange_count("config_stream", "+", "-", 1);
let mut config: Vec<(String, Vec<(String, String)>)> = Vec::new(); let mut config: Vec<(String, Vec<(String, String)>)> = Vec::new();
println!("external config: {:?}", config);
if conf.is_ok() { if conf.is_ok() {
config = conf.unwrap(); // guarranted safe unwrapping
config = conf.unwrap()[0].clone();
let idx = config[0].0.find('-').unwrap();
let true_milis: i64 = config[0].0
.chars()
.enumerate()
.filter(|(id, _)| *id < idx)
.map(|(_, chr)| chr)
.collect::<String>()
.parse()
.unwrap();
let date = chrono::DateTime::from_timestamp_millis(true_milis).unwrap();
log::info!("Log from {} was given", date);
} else { } else {
// rewrite error handler
println!("error: {}", conf.unwrap_err()); println!("error: {}", conf.unwrap_err());
} }
let processes: Option<Processes>;
let processes: Processes;
if config.is_empty() { if config.is_empty() {
error!("No suitable configs were given!"); error!("No suitable configs were given!");
log::info!("Trying to get local config..."); log::info!("Trying to get local config...");
processes = load_processes("settings.json"); // matching check
} else { match load_processes(CONFIG_PATH) {
println!("{:?}+++{}", std::time::SystemTime::now(), &config[1].0); Some(prcs) => processes = prcs,
processes = parse_extern_config(&config[1].0); None => {
error!("No config for runner. Cannot continue running...");
return;
},
}
} else {
// matching check
match parse_extern_config(&config[0].1[0].1) {
Some(prcs) => {
//
// comparing and optionally rewriting config file
//
if let Some(local) = load_processes(CONFIG_PATH) {
// comparing
let local_date: u64 = local.date_of_creation.parse().unwrap();
let external_date: u64 = prcs.date_of_creation.parse().unwrap();
match local_date.cmp(&external_date) {
std::cmp::Ordering::Less => {
// server has sent newest version
warn!("Updating config (new version - {})...", &native_date_from_milis(&prcs.date_of_creation).unwrap());
processes = prcs;
let _ = save_new_config(&processes, CONFIG_PATH);
},
std::cmp::Ordering::Equal => {
// server has sent equal version
processes = prcs;
},
std::cmp::Ordering::Greater => {
// server has sent an elder version
log::info!("Local config version is more actual");
processes = local;
},
}
} else {
// settings.json can be deleted
processes = prcs;
match std::fs::File::create(CONFIG_PATH) {
Ok(_) => {
// saving server conf (truncating or adding new)
let _ = save_new_config(&processes, CONFIG_PATH);
},
Err(_) => {
error!("Cannot create or truncate local config file");
},
} }
// let processes: Processes = {
// let mut _temp = String::new();
// loop {
// if let Ok(msg) = pubsub.get_message() {
// log::info!("Actual configuration is found !");
// _temp = msg.get_payload::<String>().expect("error getting payload");
// break;
// } else {
// continue;
// };
// }
// parse_extern_config(&_temp)
// };
if let None = processes { }
error!("no config (temp log)"); },
None => {
//temp
warn!("External config file parsing failed. Trying to get local config...");
if let Some(prcs) = load_processes(CONFIG_PATH) {
processes = prcs;
let date_of_creation: Result<i64, _> = processes.date_of_creation.parse();
if date_of_creation.is_err() {
error!("Local config date of creation parsing failed. Returning...");
return;
} else {
let date_of_creation = date_of_creation.unwrap();
if let Some(date) = chrono::DateTime::from_timestamp_millis(date_of_creation) {
log::info!("Config from {} is actual now!", date);
} else {
error!("Local config date of creation parsing failed. Returning...");
return; return;
} }
let processes = processes.unwrap(); }
} else {
error!("No config for runner. Cannot continue running...");
return;
}
},
}
}
log::info!("Current runner configuration: {}\n", &processes.date_of_creation);
if processes.processes.len() == 0 { if processes.processes.len() == 0 {
error!("Processes list is null, runner-rs initialization is stopped"); error!("Processes list is null, runner-rs initialization is stopped");
// eprintln!("Error: Processes list is null, runner-rs initialization is stopped");
return; return;
} }
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![]; let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
@ -195,13 +251,6 @@ async fn main() {
proc.dependencies.services.len() proc.dependencies.services.len()
); );
// println!("Process '{}' on stage:\n{}\nDepends on {} file(s), {} service(s)\n",
// proc.name,
// proc.path,
// proc.dependencies.files.len(),
// proc.dependencies.services.len()
// );
// creating msg channel // creating msg channel
// can or should be executed in new thread // can or should be executed in new thread
let (tx, mut rx) = mpsc::channel::<u8>(1); let (tx, mut rx) = mpsc::channel::<u8>(1);
@ -219,8 +268,23 @@ async fn main() {
return; return;
} }
fn save_new_config(config: &str) -> Result<(), CustomError> { fn native_date_from_milis(mls: &str) -> Option<chrono::DateTime<Utc>> {
Ok(()) match mls.parse::<i64>(){
Ok(val) => return chrono::DateTime::from_timestamp_millis(val),
Err(_) => return None,
}
}
fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> {
match serde_json::to_string_pretty(&config) {
Ok(st) => {
match fs::write(config_file, st) {
Ok(_) => return Ok(()),
Err(_) => return Err(CustomError::Fatal),
}
},
Err(_) => return Err(CustomError::Fatal),
}
} }
async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> { async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {