Compare commits

..

3 Commits

Author SHA1 Message Date
Vladislav Drozdov a8439d9a27
Merge pull request #10 from prplV/feature/EN-8
feature/en 8
2024-09-05 14:57:23 +03:00
prplV c63d7177ba added linux signals handling + new aliases 2024-09-05 14:55:38 +03:00
prplV 59edb21821 #refactoring 2024-09-05 10:51:29 +03:00
9 changed files with 123 additions and 50 deletions

View File

@ -3,8 +3,9 @@ runner = "riscv64-unknown-elf-gdb -q -x gdb_init"
linker = "riscv64-linux-gnu-gcc" linker = "riscv64-linux-gnu-gcc"
[alias] [alias]
x86_64 = "build --release" x86_64 = "build --release --target x86_64-unknown-linux-gnu"
riscv64 = "build --release --target riscv64gc-unknown-linux-gnu" riscv64 = "build --release --target riscv64gc-unknown-linux-gnu"
rbuild = "build --release"
[build] [build]
target = "x86_64-unknown-linux-gnu" target = "x86_64-unknown-linux-gnu"

View File

@ -7,26 +7,15 @@ use tokio::time::Duration;
static CONFIG_PATH : &'static str = "settings.json"; static CONFIG_PATH : &'static str = "settings.json";
// 4ever sync // 4ever sync
fn load_processes(json_filename: &str) -> Option<Processes>{ fn load_processes(json_filename: &str) -> Option<Processes>{
match fs::read_to_string(json_filename) { if let Ok(res) = fs::read_to_string(json_filename) {
Ok(res) => { if let Ok(conf) = serde_json::from_str::<Processes>(&res) {
match serde_json::from_str::<Processes>(&res) { return Some(conf);
Ok(conf) => { }
return Some(conf);
},
Err(_) => {
return None;
},
}
},
Err(_) => {
return None;
},
} }
None
} }
pub fn get_actual_config() -> Option<Processes> { pub fn get_actual_config() -> Option<Processes> {
// todo: local check Some|None -> redis check
// * if no conf -> loop and +inf getting conf from redis server // * if no conf -> loop and +inf getting conf from redis server
// let mut local = load_processes(&CONFIG_PATH); // let mut local = load_processes(&CONFIG_PATH);
match load_processes(&CONFIG_PATH) { match load_processes(&CONFIG_PATH) {
@ -46,7 +35,7 @@ pub fn get_actual_config() -> Option<Processes> {
}, },
} }
} }
return Some(local_conf); Some(local_conf)
}, },
None => { None => {
// ? ? OUTSTANDING CONSTRUCTION ? // ? ? OUTSTANDING CONSTRUCTION ?
@ -59,7 +48,7 @@ pub fn get_actual_config() -> Option<Processes> {
} }
} }
// ! once iter exec // ! once iter exec
// ! only for situation when local isnt None (no need to fck redis server) // ! only for situation when local isn't None (no need to fck redis server)
fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> { fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
match redis::Client::open(serv_info) { match redis::Client::open(serv_info) {
Ok(client) => { Ok(client) => {
@ -68,13 +57,13 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
if let Ok(len) = conn.xlen::<&str, usize>("config_stream") { if let Ok(len) = conn.xlen::<&str, usize>("config_stream") {
if len == 0 { if len == 0 {
warn!("No configuration in DB yet"); warn!("No configuration in DB yet");
return None; None
} else { } else {
let conf: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); let conf: redis::RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>> = conn.xrevrange_count("config_stream", "+", "-", 1);
let config: &Vec<(String, Vec<(String, String)>)>; let config: &Vec<(String, Vec<(String, String)>)>;
if conf.is_ok() { if conf.is_ok() {
// guarranted safe unwrapping // guaranteed and safe unwrapping
let conf = conf.unwrap(); let conf = conf.unwrap();
config = &conf[0]; config = &conf[0];
if config.is_empty() { if config.is_empty() {
@ -82,31 +71,31 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
return None; return None;
} }
match parse_extern_config(&config[0].1[0].1) { match parse_extern_config(&config[0].1[0].1) {
Some(prcs) => return Some(prcs), Some(prcs) => Some(prcs),
None => { None => {
error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!"); error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!");
return None; None
}, },
} }
} else { } else {
error!("Configuration pulling from Redis stream failed. Check stream state!"); error!("Configuration pulling from Redis stream failed. Check stream state!");
return None; None
} }
} }
} else { } else {
error!("Cannot find config_stream. Check Redis-stream accessibility!"); error!("Cannot find config_stream. Check Redis-stream accessibility!");
return None; None
} }
}, },
Err(_) => { Err(_) => {
error!("Redis connection attempt is failed. Check Redis configuration!"); error!("Redis connection attempt is failed. Check Redis configuration!");
return None; None
}, },
} }
}, },
Err(_) => { Err(_) => {
error!("Redis-Client opening attempt is failed. Check network configuration!"); error!("Redis-Client opening attempt is failed. Check network configuration!");
return None; None
}, },
} }
} }
@ -117,7 +106,7 @@ fn open_watcher(serv_info: &str) -> redis::Client {
loop { loop {
match redis::Client::open(serv_info) { match redis::Client::open(serv_info) {
Ok(redis) => { Ok(redis) => {
info!("Succesfully opened Redis-Client"); info!("Successfully opened Redis-Client");
return redis return redis
}, },
Err(_) => { Err(_) => {

View File

@ -10,7 +10,7 @@ use std::borrow::BorrowMut;
pub async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> { pub async fn create_watcher(filename: &str, path: &str) -> Result<Inotify, std::io::Error> {
let src = format!("{}{}", path, filename); let src = format!("{}{}", path, filename);
let mut inotify = Inotify::init().unwrap_or_else(|_| { let inotify: Inotify = Inotify::init().unwrap_or_else(|_| {
error!("{}",format!("Cannot create watcher for {}", &src)); error!("{}",format!("Cannot create watcher for {}", &src));
std::process::exit(101); std::process::exit(101);
}); });
@ -32,7 +32,6 @@ pub async fn file_handler
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>> watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>
) -> Result<(), CustomError> ) -> Result<(), CustomError>
{ {
// println!("file daemon on {}", name);
for (i, file) in files.iter().enumerate() { for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename); // let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() { if check_file(&file.filename, &file.src).await.is_err() {
@ -81,7 +80,7 @@ pub async fn file_handler
if let EventMask::DELETE_SELF = event { if let EventMask::DELETE_SELF = event {
// ! warning (DELETE_SELF event) ! // ! warning (DELETE_SELF event) !
// println!("! warning (DELETE_SELF event) !"); // println!("! warning (DELETE_SELF event) !");
// * watcher recreation after dealing with file recreation mech in text editors // * watcher recreation after dealing with file recreation mechanism in text editors
let mutex = notify.borrow_mut(); let mutex = notify.borrow_mut();
*mutex = create_watcher(&file.filename, &file.src).await.unwrap(); *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
@ -114,8 +113,8 @@ pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string()); let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string()); let arc_path = Arc::new(path.to_string());
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let fileconcat = format!("{}{}", arc_path, arc_name); let file_concat = format!("{}{}", arc_path, arc_name);
let path = Path::new(&fileconcat); let path = Path::new(&file_concat);
if path.exists() { if path.exists() {
Ok(()) Ok(())
} else { } else {

View File

@ -1,11 +1,11 @@
use std::fs::OpenOptions; // use std::fs::OpenOptions;
use std::process::Command; // use std::process::Command;
use chrono::Local; use chrono::Local;
use env_logger::Builder; use env_logger::Builder;
use std::io::Write; use std::io::Write;
use log::LevelFilter; use log::LevelFilter;
use tokio::fs::File; // use tokio::fs::File;
use tokio::io::BufWriter; // use tokio::io::BufWriter;
use crate::utils::get_container_id; use crate::utils::get_container_id;
// struct CustomLogger<W: Write> { // struct CustomLogger<W: Write> {
@ -38,6 +38,7 @@ pub fn setup_logger() -> Result<(), crate::structs::CustomError> {
}) })
.filter(None, LevelFilter::Info) .filter(None, LevelFilter::Info)
.target(env_logger::Target::Stdout) .target(env_logger::Target::Stdout)
// temporary deprecated
// .target(env_logger::Target::Pipe(log_target)) // .target(env_logger::Target::Pipe(log_target))
.init(); .init();

View File

@ -5,6 +5,7 @@ mod prcs;
mod utils; mod utils;
mod services; mod services;
mod logger; mod logger;
mod signals;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
@ -13,6 +14,7 @@ use structs::*;
use config::*; use config::*;
use utils::*; use utils::*;
use logger::setup_logger; use logger::setup_logger;
use signals::set_valid_destructor;
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
@ -56,6 +58,15 @@ async fn main() {
}); });
handler.push(event); handler.push(event);
} }
// destructor addition
handler.push(
tokio::spawn(async move {
if let Err(_) = set_valid_destructor(Arc::new(&processes.processes)).await {
error!("Linux signals handler creation failed. Returning...");
return;
}
})
);
for i in handler { for i in handler {
i.await.unwrap(); i.await.unwrap();
} }

View File

@ -66,7 +66,7 @@ async fn looped_service_connecting(
}, },
} }
} }
return Ok(()); Ok(())
} else { } else {
let start = Instant::now(); let start = Instant::now();
while start.elapsed().as_secs() < serv.triggers.wait.into() { while start.elapsed().as_secs() < serv.triggers.wait.into() {
@ -82,7 +82,7 @@ async fn looped_service_connecting(
}, },
} }
} }
return Err(CustomError::Fatal); Err(CustomError::Fatal)
} }
} }
@ -94,11 +94,11 @@ async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> {
match addr.to_socket_addrs() { match addr.to_socket_addrs() {
Ok(mut addrs) => { Ok(mut addrs) => {
if let Some(_) = addrs.find(|a| TcpStream::connect_timeout(a, std::time::Duration::new(1, 0)).is_ok()) { if let Some(_) = addrs.find(|a| TcpStream::connect_timeout(a, std::time::Duration::new(1, 0)).is_ok()) {
return Ok(()); Ok(())
} else { } else {
return Err(CustomError::Fatal); Err(CustomError::Fatal)
} }
}, },
Err(_) => return Err(CustomError::Fatal), Err(_) => Err(CustomError::Fatal),
} }
} }

76
src/signals.rs Normal file
View File

@ -0,0 +1,76 @@
use crate::structs::{CustomError, TrackingProcess};
use std::sync::Arc;
use tokio::io;
use tokio::{
select,
signal::unix::{signal, Signal, SignalKind},
};
type PendingProcesses<'a> = Arc<&'a Vec<TrackingProcess>>;
pub async fn set_valid_destructor<'a>(prcs: PendingProcesses<'a>) -> Result<(), CustomError> {
let (mut int, mut term, mut stop) = (
Sig::new(Signals::Sigint),
Sig::new(Signals::Sigterm),
Sig::new(Signals::Sigstop),
);
// todo: select! for handlers and exec destructor
select! {
_ = int.post_processing(prcs.clone()) => {log::info!("Interrupting main thread...")},
_ = term.post_processing(prcs.clone()) => {log::info!("Terminating main thread...")},
_ = stop.post_processing(prcs.clone()) => {log::info!("Freezing main thread...")},
}
std::process::exit(1);
}
enum Signals {
Sigint,
Sigterm,
Sigstop,
}
struct Sig {
signal: Signal,
sig_type: Signals,
}
impl Sig {
fn new(signal_type: Signals) -> Self {
Sig {
signal: signal_type.get_signal().unwrap(),
sig_type: signal_type,
}
}
}
impl std::fmt::Display for Signals {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Signals::Sigint => write!(f, "SIGINT"),
Signals::Sigterm => write!(f, "SIGTERM"),
Signals::Sigstop => write!(f, "SIGSTOP"),
}
}
}
impl Signals {
fn get_signal(&self) -> io::Result<Signal> {
match self {
Signals::Sigint => signal(SignalKind::interrupt()),
Signals::Sigterm => signal(SignalKind::terminate()),
Signals::Sigstop => signal(SignalKind::quit()),
}
}
}
trait SigPostProcessing<'a> {
async fn post_processing(&mut self, prcs: PendingProcesses<'a>) -> io::Result<()>;
}
impl<'a> SigPostProcessing<'a> for Sig {
async fn post_processing(&mut self, prcs: PendingProcesses<'a>) -> io::Result<()> {
// manipulations ...
if let Some(_) = self.signal.recv().await {
log::info!("Got {}", self.sig_type);
prcs.iter().for_each(|proc| {
log::info!("Terminating {}", &proc.name);
let _ = crate::prcs::terminate_process(&proc.name);
})
}
Ok(())
}
}

View File

@ -1,6 +1,6 @@
use serde::{ Deserialize, Serialize }; use serde::{ Deserialize, Serialize };
/// # an Error enum (nextly will be deleted and replaced) /// # an Error enum (next will be deleted and replaced)
pub enum CustomError { pub enum CustomError {
Fatal, Fatal,
} }
@ -30,7 +30,7 @@ pub struct TrackingProcess {
pub dependencies: Dependencies, pub dependencies: Dependencies,
} }
/// # struct for processes' dependecies including files and services /// # struct for processes' dependencies including files and services
/// > (needed in serialization and deserialization) /// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Dependencies { pub struct Dependencies {
@ -68,7 +68,7 @@ pub struct ServiceTriggers {
pub on_lost : String, pub on_lost : String,
} }
/// # struct for instancing each file's policies such as ondelete or onupdate events /// # struct for instancing each file's policies such as on-delete or onupdate events
/// > (needed in serialization and deserialization) /// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct FIleTriggers { pub struct FIleTriggers {

View File

@ -1,13 +1,10 @@
use std::fs::OpenOptions;
use std::sync::Arc; use std::sync::Arc;
use crate::structs::TrackingProcess; use crate::structs::TrackingProcess;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use inotify::Inotify; use inotify::Inotify;
use std::process::Command; use std::process::Command;
use chrono::Local;
use env_logger::Builder;
use crate::files::create_watcher; use crate::files::create_watcher;
use log::{error, warn, LevelFilter}; use log::{error, warn};
use crate::prcs::{ use crate::prcs::{
is_active, is_active,
is_frozen, is_frozen,
@ -19,7 +16,6 @@ use crate::prcs::{
}; };
use tokio::time::Duration; use tokio::time::Duration;
use tokio::join; use tokio::join;
use std::io::Write;
use crate::files::file_handler; use crate::files::file_handler;
use crate::services::service_handler; use crate::services::service_handler;