From f503d2952b838dd96f87b74f395c402df110dea5 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 18 Feb 2025 13:21:21 +0300 Subject: [PATCH] init commit --- .gitignore | 3 ++ Cargo.toml | 17 +++++++++++ src/delivery.rs | 73 ++++++++++++++++++++++++++++++++++++++++++++++ src/integration.rs | 20 +++++++++++++ src/logger.rs | 50 +++++++++++++++++++++++++++++++ src/main.rs | 46 +++++++++++++++++++++++++++++ 6 files changed, 209 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/delivery.rs create mode 100644 src/integration.rs create mode 100644 src/logger.rs create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3549fae --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +Cargo.lock +.env \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..d6686e7 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "config-delivery" +version = "0.3.4" +edition = "2021" + +[dependencies] +dotenv = "0.15.0" +rand = "0.8.5" +serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.135" +tokio = { version = "1.43.0", features = ["full"] } +tokio-websockets = { version = "^0.11.0", features = ["client", "openssl", "rand"] } +integr-structs = {path = "../integr-structs"} +anyhow = "1.0.95" +env_logger = "0.11.6" +log = "0.4.25" +chrono = "0.4.39" diff --git a/src/delivery.rs b/src/delivery.rs new file mode 100644 index 0000000..6226764 --- /dev/null +++ b/src/delivery.rs @@ -0,0 +1,73 @@ +// mod to communicate with api-grub and preproc services +// using Unix-Socket Client + +use anyhow::{Error, Result}; +use integr_structs::api::ApiConfigV2; +use tokio::time::{sleep, Duration}; +use tokio::net::UnixStream; +use std::env; +use tokio::sync::mpsc::Receiver; +use log::{info, error}; +use serde_json::to_string_pretty; + +#[derive(Debug)] +enum UnixSocketConsumer { + ApiGrubber, + Preproc, +} +// to create us-client +struct UnixSocketClient; +// to catch new configs from integr submod +type ConfigGateway = Receiver; + +pub async fn init_delivery_mech(rx: ConfigGateway) -> Result<()> { + let mut rx = rx; + loop { + while rx.is_empty() { + continue; + }; + info!("New config was pulled. Redirecting it ..."); + let config = rx.recv().await.unwrap(); + match UnixSocketConsumer::ApiGrubber.get_stream_object().await { + Ok(socket) => { + socket.try_write(to_string_pretty(&config)?.as_bytes())?; + info!("New api config was pulled"); + }, + Err(er) => { + error!("Cannot coonect to ApiGrubber Unix-Socket due to {er}"); + } + } + sleep(Duration::from_secs(3)).await; + } + // let api_sock = UnixSocketConsumer::ApiGrubber.get_stream_object().await?; + // let preproc_sock = UnixSocketConsumer::Preproc.get_stream_object().await?; +} + +impl UnixSocketConsumer { + pub async fn get_stream_object(&self) -> Result { + let socket_file = match self { + UnixSocketConsumer::ApiGrubber => env::var("API_GRUBBER_SOCKET")?, + UnixSocketConsumer::Preproc => env::var("PREPROC_SOCKET")?, + }; + UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg(format!("Cannot create {:?} Unix-Socket client", self)))) + } +} + +// async fn check_unix_socket_file(path: &str) -> bool { +// std::path::Path::new(path).exists() +// } + +#[cfg(test)] +mod delivery_unittests { + #[allow(unused_imports)] + use super::*; + use tokio::test; + + // + #[test] + async fn check_api_unix_socket_client_creation() { assert!(true); } + + #[test] + async fn check_preproc_unix_socket_client_creation() { assert!(true); } + // +} \ No newline at end of file diff --git a/src/integration.rs b/src/integration.rs new file mode 100644 index 0000000..bf883a7 --- /dev/null +++ b/src/integration.rs @@ -0,0 +1,20 @@ +use std::sync::Arc; + +use tokio::sync::mpsc::Sender; +use integr_structs::api::ApiConfigV2; +use anyhow::Result; +use tokio::time::{sleep, Duration}; +// mock + +type Worker = Arc>; + +#[allow(dead_code, unused_variables)] +pub async fn init_integration_mech(tx: Worker) -> Result<()> { + loop { + sleep(Duration::from_secs(5)).await; + let conf = ApiConfigV2::default(); + tx.send(conf).await?; + } + // Ok(()) +} + diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000..6578b44 --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,50 @@ +use chrono::Local; +use env_logger::Builder; +use log::LevelFilter; +use std::io::Write; +use anyhow::Result; +use log::info; + +pub async fn setup_logger() -> Result<()> { + Builder::new() + .format(move |buf, record| { + writeln!( + buf, + "|{}| {} [{}] - {}", + "config-delivery", + Local::now().format("%d-%m-%Y %H:%M:%S"), + record.level(), + record.args(), + ) + }) + .filter(None, LevelFilter::Info) + .target(env_logger::Target::Stdout) + .init(); + + info!("Logger configured"); + Ok(()) +} + + +#[cfg(test)] +mod logger_unittests { + use tokio::test; + use super::*; + #[test] + async fn check_logger_builder() { + Builder::new() + .format(move |buf, record| { + writeln!( + buf, + "|{}| {} [{}] - {}", + "config-delivery", + Local::now().format("%d-%m-%Y %H:%M:%S"), + record.level(), + record.args(), + ) + }) + .filter(None, LevelFilter::Info) + .target(env_logger::Target::Stdout) + .init(); + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..e5955b7 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,46 @@ +mod delivery; +mod integration; +mod logger; + +use integr_structs::api::ApiConfigV2; +use logger::setup_logger; +use dotenv::dotenv; +use anyhow::Result; +use tokio::sync::mpsc; +use log::{info, error}; +use integration::init_integration_mech; +use delivery::init_delivery_mech; +use std::sync::Arc; + +// Arch : +// 1) 2 Unix-Socket client (for api grub and preproc) :: i think its a continious process for events when services are unavailable +// 2) mpsc beetween `delivery` and `integration` :: +// 3) websocket client in `integration` to pull configs from Monitoring System :: + +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<()> { + let _ = setup_logger().await?; + info!("Pulling env vars from .env file if exists ..."); + dotenv().ok(); + + let (tx, rx) = mpsc::channel::(1); + let tx = Arc::new(tx); + let integr_jh = tokio::spawn( + async move { + if let Err(er) = init_integration_mech(tx).await { + error!("Error in integration submodule during to: {}", er); + } + } + ); + let deliv_future = tokio::spawn(async move { + if let Err(er) = init_delivery_mech(rx).await { + error!("Error in delivery submodule during to: {}", er); + } + }); + // for deliv adding + // let action_vec: Vec> = vec![integr_jh, deliv_future]; + for event in vec![integr_jh, deliv_future] { + let _ = event.await; + } + Ok(()) +}