diff --git a/Cargo.toml b/Cargo.toml index bbeed62..8f70c37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" members = [ - "crates/api-grub", "crates/config-delivery", "crates/integr-structs", "crates/preproc", + "crates/api-grub", "crates/integr-structs", "crates/preproc", ] [profile.dev] diff --git a/crates/config-delivery/Cargo.toml b/crates/config-delivery/Cargo.toml deleted file mode 100644 index d6686e7..0000000 --- a/crates/config-delivery/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "config-delivery" -version = "0.3.4" -edition = "2021" - -[dependencies] -dotenv = "0.15.0" -rand = "0.8.5" -serde = { version = "1.0.217", features = ["derive"] } -serde_json = "1.0.135" -tokio = { version = "1.43.0", features = ["full"] } -tokio-websockets = { version = "^0.11.0", features = ["client", "openssl", "rand"] } -integr-structs = {path = "../integr-structs"} -anyhow = "1.0.95" -env_logger = "0.11.6" -log = "0.4.25" -chrono = "0.4.39" diff --git a/crates/config-delivery/src/delivery.rs b/crates/config-delivery/src/delivery.rs deleted file mode 100644 index 6226764..0000000 --- a/crates/config-delivery/src/delivery.rs +++ /dev/null @@ -1,73 +0,0 @@ -// mod to communicate with api-grub and preproc services -// using Unix-Socket Client - -use anyhow::{Error, Result}; -use integr_structs::api::ApiConfigV2; -use tokio::time::{sleep, Duration}; -use tokio::net::UnixStream; -use std::env; -use tokio::sync::mpsc::Receiver; -use log::{info, error}; -use serde_json::to_string_pretty; - -#[derive(Debug)] -enum UnixSocketConsumer { - ApiGrubber, - Preproc, -} -// to create us-client -struct UnixSocketClient; -// to catch new configs from integr submod -type ConfigGateway = Receiver; - -pub async fn init_delivery_mech(rx: ConfigGateway) -> Result<()> { - let mut rx = rx; - loop { - while rx.is_empty() { - continue; - }; - info!("New config was pulled. Redirecting it ..."); - let config = rx.recv().await.unwrap(); - match UnixSocketConsumer::ApiGrubber.get_stream_object().await { - Ok(socket) => { - socket.try_write(to_string_pretty(&config)?.as_bytes())?; - info!("New api config was pulled"); - }, - Err(er) => { - error!("Cannot coonect to ApiGrubber Unix-Socket due to {er}"); - } - } - sleep(Duration::from_secs(3)).await; - } - // let api_sock = UnixSocketConsumer::ApiGrubber.get_stream_object().await?; - // let preproc_sock = UnixSocketConsumer::Preproc.get_stream_object().await?; -} - -impl UnixSocketConsumer { - pub async fn get_stream_object(&self) -> Result { - let socket_file = match self { - UnixSocketConsumer::ApiGrubber => env::var("API_GRUBBER_SOCKET")?, - UnixSocketConsumer::Preproc => env::var("PREPROC_SOCKET")?, - }; - UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg(format!("Cannot create {:?} Unix-Socket client", self)))) - } -} - -// async fn check_unix_socket_file(path: &str) -> bool { -// std::path::Path::new(path).exists() -// } - -#[cfg(test)] -mod delivery_unittests { - #[allow(unused_imports)] - use super::*; - use tokio::test; - - // - #[test] - async fn check_api_unix_socket_client_creation() { assert!(true); } - - #[test] - async fn check_preproc_unix_socket_client_creation() { assert!(true); } - // -} \ No newline at end of file diff --git a/crates/config-delivery/src/integration.rs b/crates/config-delivery/src/integration.rs deleted file mode 100644 index bf883a7..0000000 --- a/crates/config-delivery/src/integration.rs +++ /dev/null @@ -1,20 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::mpsc::Sender; -use integr_structs::api::ApiConfigV2; -use anyhow::Result; -use tokio::time::{sleep, Duration}; -// mock - -type Worker = Arc>; - -#[allow(dead_code, unused_variables)] -pub async fn init_integration_mech(tx: Worker) -> Result<()> { - loop { - sleep(Duration::from_secs(5)).await; - let conf = ApiConfigV2::default(); - tx.send(conf).await?; - } - // Ok(()) -} - diff --git a/crates/config-delivery/src/logger.rs b/crates/config-delivery/src/logger.rs deleted file mode 100644 index 6578b44..0000000 --- a/crates/config-delivery/src/logger.rs +++ /dev/null @@ -1,50 +0,0 @@ -use chrono::Local; -use env_logger::Builder; -use log::LevelFilter; -use std::io::Write; -use anyhow::Result; -use log::info; - -pub async fn setup_logger() -> Result<()> { - Builder::new() - .format(move |buf, record| { - writeln!( - buf, - "|{}| {} [{}] - {}", - "config-delivery", - Local::now().format("%d-%m-%Y %H:%M:%S"), - record.level(), - record.args(), - ) - }) - .filter(None, LevelFilter::Info) - .target(env_logger::Target::Stdout) - .init(); - - info!("Logger configured"); - Ok(()) -} - - -#[cfg(test)] -mod logger_unittests { - use tokio::test; - use super::*; - #[test] - async fn check_logger_builder() { - Builder::new() - .format(move |buf, record| { - writeln!( - buf, - "|{}| {} [{}] - {}", - "config-delivery", - Local::now().format("%d-%m-%Y %H:%M:%S"), - record.level(), - record.args(), - ) - }) - .filter(None, LevelFilter::Info) - .target(env_logger::Target::Stdout) - .init(); - } -} \ No newline at end of file diff --git a/crates/config-delivery/src/main.rs b/crates/config-delivery/src/main.rs deleted file mode 100644 index e5955b7..0000000 --- a/crates/config-delivery/src/main.rs +++ /dev/null @@ -1,46 +0,0 @@ -mod delivery; -mod integration; -mod logger; - -use integr_structs::api::ApiConfigV2; -use logger::setup_logger; -use dotenv::dotenv; -use anyhow::Result; -use tokio::sync::mpsc; -use log::{info, error}; -use integration::init_integration_mech; -use delivery::init_delivery_mech; -use std::sync::Arc; - -// Arch : -// 1) 2 Unix-Socket client (for api grub and preproc) :: i think its a continious process for events when services are unavailable -// 2) mpsc beetween `delivery` and `integration` :: -// 3) websocket client in `integration` to pull configs from Monitoring System :: - -#[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<()> { - let _ = setup_logger().await?; - info!("Pulling env vars from .env file if exists ..."); - dotenv().ok(); - - let (tx, rx) = mpsc::channel::(1); - let tx = Arc::new(tx); - let integr_jh = tokio::spawn( - async move { - if let Err(er) = init_integration_mech(tx).await { - error!("Error in integration submodule during to: {}", er); - } - } - ); - let deliv_future = tokio::spawn(async move { - if let Err(er) = init_delivery_mech(rx).await { - error!("Error in delivery submodule during to: {}", er); - } - }); - // for deliv adding - // let action_vec: Vec> = vec![integr_jh, deliv_future]; - for event in vec![integr_jh, deliv_future] { - let _ = event.await; - } - Ok(()) -}