From a225ec690ee146cb2509c6c58bc149b7810c72c0 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 21 Jan 2025 10:43:05 +0300 Subject: [PATCH] delivery tiny logic --- crates/config-delivery/src/delivery.rs | 30 ++++++++++++++++++++++- crates/config-delivery/src/integration.rs | 8 ++++-- crates/config-delivery/src/main.rs | 30 ++++++++++++++++++++--- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/crates/config-delivery/src/delivery.rs b/crates/config-delivery/src/delivery.rs index 38afe4b..287b567 100644 --- a/crates/config-delivery/src/delivery.rs +++ b/crates/config-delivery/src/delivery.rs @@ -6,13 +6,41 @@ use integr_structs::api::ApiConfig; use tokio::time::{sleep, Instant}; use tokio::net::UnixStream; use std::env; +use tokio::sync::mpsc::Receiver; +use log::{info, error}; +use serde_json::to_string_pretty; +#[derive(Debug)] enum UnixSocketConsumer { ApiGrubber, Preproc, } // to create us-client struct UnixSocketClient; +// to catch new configs from integr submod +type ConfigGateway = Receiver; + +pub async fn init_delivery_mech(rx: ConfigGateway) -> Result<()> { + let mut rx = rx; + loop { + while rx.is_empty() { + continue; + }; + info!("New config was pulled. Redirecting it ..."); + let config = rx.recv().await.unwrap(); + match UnixSocketConsumer::ApiGrubber.get_stream_object().await { + Ok(socket) => { + socket.try_write(to_string_pretty(&config)?.as_bytes())?; + }, + Err(er) => { + error!("Cannot coonect to ApiGrubber Unix-Socket due to {er}"); + } + } + } + // let api_sock = UnixSocketConsumer::ApiGrubber.get_stream_object().await?; + // let preproc_sock = UnixSocketConsumer::Preproc.get_stream_object().await?; + Ok(()) +} impl UnixSocketConsumer { pub async fn get_stream_object(&self) -> Result { @@ -20,7 +48,7 @@ impl UnixSocketConsumer { UnixSocketConsumer::ApiGrubber => env::var("API_GRUBBER_SOCKET")?, UnixSocketConsumer::Preproc => env::var("PREPROC_SOCKET")?, }; - UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg("Cannot create Unix-Socket client"))) + UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg(format!("Cannot create {:?} Unix-Socket client", self)))) } } diff --git a/crates/config-delivery/src/integration.rs b/crates/config-delivery/src/integration.rs index 9d9e6f3..b2b9f55 100644 --- a/crates/config-delivery/src/integration.rs +++ b/crates/config-delivery/src/integration.rs @@ -1,13 +1,17 @@ +use std::sync::Arc; + use tokio::sync::mpsc::Sender; use integr_structs::api::ApiConfig; use anyhow::Result; use tokio::time::{sleep, Duration}; // mock +type Worker = Arc>; + #[allow(dead_code, unused_variables)] -pub async fn init_integration_mech(tx: &Sender) -> Result<()> { +pub async fn init_integration_mech(tx: Worker) -> Result<()> { loop { - sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(5)).await; let conf = ApiConfig::default(); tx.send(conf).await?; } diff --git a/crates/config-delivery/src/main.rs b/crates/config-delivery/src/main.rs index 99eded1..4e20dc5 100644 --- a/crates/config-delivery/src/main.rs +++ b/crates/config-delivery/src/main.rs @@ -2,11 +2,16 @@ mod delivery; mod integration; mod logger; +use integr_structs::api::ApiConfig; use logger::setup_logger; use dotenv::dotenv; use anyhow::Result; use tokio::sync::mpsc; -use log::info; +use log::{info, error}; +use integration::init_integration_mech; +use delivery::init_delivery_mech; +use tokio::task::JoinHandle; +use std::sync::Arc; // Arch : // 1) 2 Unix-Socket client (for api grub and preproc) :: i think its a continious process for events when services are unavailable @@ -16,10 +21,27 @@ use log::info; #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let _ = setup_logger().await?; - dotenv().ok(); info!("Pulling env vars from .env file if exists ..."); + dotenv().ok(); - // - + let (tx, rx) = mpsc::channel::(1); + let tx = Arc::new(tx); + let integr_jh = tokio::spawn( + async move { + if let Err(er) = init_integration_mech(tx).await { + error!("Error in integration submodule during to: {}", er); + } + } + ); + let deliv_future = tokio::spawn(async move { + if let Err(er) = init_delivery_mech(rx).await { + error!("Error in delivery submodule during to: {}", er); + } + }); + // for deliv adding + // let action_vec: Vec> = vec![integr_jh, deliv_future]; + for event in vec![integr_jh, deliv_future] { + let _ = event.await; + } Ok(()) }