From d94b0f809b011bb84859d7be60b23e229bc6737d Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 21 Jan 2025 09:40:31 +0300 Subject: [PATCH 01/54] rc created --- crates/config-delivery/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/config-delivery/src/main.rs b/crates/config-delivery/src/main.rs index 2be1247..99eded1 100644 --- a/crates/config-delivery/src/main.rs +++ b/crates/config-delivery/src/main.rs @@ -18,7 +18,8 @@ async fn main() -> Result<()> { let _ = setup_logger().await?; dotenv().ok(); info!("Pulling env vars from .env file if exists ..."); - println!("Hello, world!"); + + // Ok(()) } -- 2.40.1 From c607bdc0d6c5520073f5cda336b9e433310a952b Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 21 Jan 2025 09:44:14 +0300 Subject: [PATCH 02/54] feat created --- crates/config-delivery/src/integration.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/config-delivery/src/integration.rs b/crates/config-delivery/src/integration.rs index e69de29..bdbe7b3 100644 --- a/crates/config-delivery/src/integration.rs +++ b/crates/config-delivery/src/integration.rs @@ -0,0 +1 @@ +// mock \ No newline at end of file -- 2.40.1 From 1e65db7ca510eef2d607f037219868cbc4a1fab9 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 21 Jan 2025 09:53:06 +0300 Subject: [PATCH 03/54] config: integration mock added --- crates/config-delivery/src/integration.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/crates/config-delivery/src/integration.rs b/crates/config-delivery/src/integration.rs index bdbe7b3..9d9e6f3 100644 --- a/crates/config-delivery/src/integration.rs +++ b/crates/config-delivery/src/integration.rs @@ -1 +1,16 @@ -// mock \ No newline at end of file +use tokio::sync::mpsc::Sender; +use integr_structs::api::ApiConfig; +use anyhow::Result; +use tokio::time::{sleep, Duration}; +// mock + +#[allow(dead_code, unused_variables)] +pub async fn init_integration_mech(tx: &Sender) -> Result<()> { + loop { + sleep(Duration::from_secs(10)).await; + let conf = ApiConfig::default(); + tx.send(conf).await?; + } + // Ok(()) +} + -- 2.40.1 From a225ec690ee146cb2509c6c58bc149b7810c72c0 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 21 Jan 2025 10:43:05 +0300 Subject: [PATCH 04/54] delivery tiny logic --- crates/config-delivery/src/delivery.rs | 30 ++++++++++++++++++++++- crates/config-delivery/src/integration.rs | 8 ++++-- crates/config-delivery/src/main.rs | 30 ++++++++++++++++++++--- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/crates/config-delivery/src/delivery.rs b/crates/config-delivery/src/delivery.rs index 38afe4b..287b567 100644 --- a/crates/config-delivery/src/delivery.rs +++ b/crates/config-delivery/src/delivery.rs @@ -6,13 +6,41 @@ use integr_structs::api::ApiConfig; use tokio::time::{sleep, Instant}; use tokio::net::UnixStream; use std::env; +use tokio::sync::mpsc::Receiver; +use log::{info, error}; +use serde_json::to_string_pretty; +#[derive(Debug)] enum UnixSocketConsumer { ApiGrubber, Preproc, } // to create us-client struct UnixSocketClient; +// to catch new configs from integr submod +type ConfigGateway = Receiver; + +pub async fn init_delivery_mech(rx: ConfigGateway) -> Result<()> { + let mut rx = rx; + loop { + while rx.is_empty() { + continue; + }; + info!("New config was pulled. Redirecting it ..."); + let config = rx.recv().await.unwrap(); + match UnixSocketConsumer::ApiGrubber.get_stream_object().await { + Ok(socket) => { + socket.try_write(to_string_pretty(&config)?.as_bytes())?; + }, + Err(er) => { + error!("Cannot coonect to ApiGrubber Unix-Socket due to {er}"); + } + } + } + // let api_sock = UnixSocketConsumer::ApiGrubber.get_stream_object().await?; + // let preproc_sock = UnixSocketConsumer::Preproc.get_stream_object().await?; + Ok(()) +} impl UnixSocketConsumer { pub async fn get_stream_object(&self) -> Result { @@ -20,7 +48,7 @@ impl UnixSocketConsumer { UnixSocketConsumer::ApiGrubber => env::var("API_GRUBBER_SOCKET")?, UnixSocketConsumer::Preproc => env::var("PREPROC_SOCKET")?, }; - UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg("Cannot create Unix-Socket client"))) + UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg(format!("Cannot create {:?} Unix-Socket client", self)))) } } diff --git a/crates/config-delivery/src/integration.rs b/crates/config-delivery/src/integration.rs index 9d9e6f3..b2b9f55 100644 --- a/crates/config-delivery/src/integration.rs +++ b/crates/config-delivery/src/integration.rs @@ -1,13 +1,17 @@ +use std::sync::Arc; + use tokio::sync::mpsc::Sender; use integr_structs::api::ApiConfig; use anyhow::Result; use tokio::time::{sleep, Duration}; // mock +type Worker = Arc>; + #[allow(dead_code, unused_variables)] -pub async fn init_integration_mech(tx: &Sender) -> Result<()> { +pub async fn init_integration_mech(tx: Worker) -> Result<()> { loop { - sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(5)).await; let conf = ApiConfig::default(); tx.send(conf).await?; } diff --git a/crates/config-delivery/src/main.rs b/crates/config-delivery/src/main.rs index 99eded1..4e20dc5 100644 --- a/crates/config-delivery/src/main.rs +++ b/crates/config-delivery/src/main.rs @@ -2,11 +2,16 @@ mod delivery; mod integration; mod logger; +use integr_structs::api::ApiConfig; use logger::setup_logger; use dotenv::dotenv; use anyhow::Result; use tokio::sync::mpsc; -use log::info; +use log::{info, error}; +use integration::init_integration_mech; +use delivery::init_delivery_mech; +use tokio::task::JoinHandle; +use std::sync::Arc; // Arch : // 1) 2 Unix-Socket client (for api grub and preproc) :: i think its a continious process for events when services are unavailable @@ -16,10 +21,27 @@ use log::info; #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let _ = setup_logger().await?; - dotenv().ok(); info!("Pulling env vars from .env file if exists ..."); + dotenv().ok(); - // - + let (tx, rx) = mpsc::channel::(1); + let tx = Arc::new(tx); + let integr_jh = tokio::spawn( + async move { + if let Err(er) = init_integration_mech(tx).await { + error!("Error in integration submodule during to: {}", er); + } + } + ); + let deliv_future = tokio::spawn(async move { + if let Err(er) = init_delivery_mech(rx).await { + error!("Error in delivery submodule during to: {}", er); + } + }); + // for deliv adding + // let action_vec: Vec> = vec![integr_jh, deliv_future]; + for event in vec![integr_jh, deliv_future] { + let _ = event.await; + } Ok(()) } -- 2.40.1 From 1f5656f7b8eccb8707ab41f5d02d0c6ac0cdc7b8 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 21 Jan 2025 16:02:40 +0300 Subject: [PATCH 05/54] config template added to the repo --- template_global_config.json | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 template_global_config.json diff --git a/template_global_config.json b/template_global_config.json new file mode 100644 index 0000000..699cae2 --- /dev/null +++ b/template_global_config.json @@ -0,0 +1,21 @@ +{ + "id" : 1 , + "template" : + [{ + "id" :"conference", + "name" : "Conference", + "url" : "https://$ip_address/api/v1/conferences", + "measure" : + [ + "total", "number", "description", + "shutdown", "startTime", "stopTime", + "participants_total", "participants_online" + ] + }], + "ip_address" : "127.0.0.1:8081", + "login" : "", + "pass" : "" , + "api_key" : "908c709827bd40n98r7209837x98273", + "period" : "10", + "timeout" : "2" +} -- 2.40.1 From ab75252e6b9a7846ded0a42f186f5eacddd196fb Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 21 Jan 2025 16:03:16 +0300 Subject: [PATCH 06/54] refactor#1 --- crates/api-grub/src/main.rs | 1 + crates/config-delivery/src/delivery.rs | 14 ++++++++------ crates/config-delivery/src/main.rs | 1 - 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/crates/api-grub/src/main.rs b/crates/api-grub/src/main.rs index 208d646..bbd48ba 100644 --- a/crates/api-grub/src/main.rs +++ b/crates/api-grub/src/main.rs @@ -21,6 +21,7 @@ async fn main() -> Result<()>{ // config update channel let (tx, mut rx) = mpsc::channel::(1); // futures + // todo : rewrite with spawn let config_fut = init_config_grub_mechanism(&tx); let grub_fut = init_api_grub_mechanism(config, &mut rx); diff --git a/crates/config-delivery/src/delivery.rs b/crates/config-delivery/src/delivery.rs index 287b567..c75bbf9 100644 --- a/crates/config-delivery/src/delivery.rs +++ b/crates/config-delivery/src/delivery.rs @@ -3,7 +3,7 @@ use anyhow::{Error, Result}; use integr_structs::api::ApiConfig; -use tokio::time::{sleep, Instant}; +use tokio::time::{sleep, Duration}; use tokio::net::UnixStream; use std::env; use tokio::sync::mpsc::Receiver; @@ -31,15 +31,16 @@ pub async fn init_delivery_mech(rx: ConfigGateway) -> Result<()> { match UnixSocketConsumer::ApiGrubber.get_stream_object().await { Ok(socket) => { socket.try_write(to_string_pretty(&config)?.as_bytes())?; + info!("New api config was pulled"); }, Err(er) => { error!("Cannot coonect to ApiGrubber Unix-Socket due to {er}"); } } + sleep(Duration::from_secs(3)).await; } // let api_sock = UnixSocketConsumer::ApiGrubber.get_stream_object().await?; // let preproc_sock = UnixSocketConsumer::Preproc.get_stream_object().await?; - Ok(()) } impl UnixSocketConsumer { @@ -52,12 +53,13 @@ impl UnixSocketConsumer { } } -async fn check_unix_socket_file(path: &str) -> bool { - std::path::Path::new(path).exists() -} +// async fn check_unix_socket_file(path: &str) -> bool { +// std::path::Path::new(path).exists() +// } -#[cfg(tests)] +#[cfg(test)] mod delivery_unittests { + #[allow(unused_imports)] use super::*; use tokio::test; diff --git a/crates/config-delivery/src/main.rs b/crates/config-delivery/src/main.rs index 4e20dc5..7f8803b 100644 --- a/crates/config-delivery/src/main.rs +++ b/crates/config-delivery/src/main.rs @@ -10,7 +10,6 @@ use tokio::sync::mpsc; use log::{info, error}; use integration::init_integration_mech; use delivery::init_delivery_mech; -use tokio::task::JoinHandle; use std::sync::Arc; // Arch : -- 2.40.1 From d3fcfae15b8017f9038408b72cbf63c74931e782 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 21 Jan 2025 16:31:10 +0300 Subject: [PATCH 07/54] new valid structs for sharing between services --- crates/integr-structs/src/api.rs | 35 ++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index 70f07a4..82e8f5d 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -1,4 +1,7 @@ +use std::collections::{HashMap, HashSet}; + use serde::{Serialize, Deserialize}; +use serde_json::Value; #[derive(Serialize, Deserialize, Debug)] @@ -21,4 +24,36 @@ impl Default for ApiConfig { delay : 0, } } +} + +// v2 +#[derive(Serialize, Deserialize, Debug)] +pub struct ApiConfigV2 { + id : u64, + #[serde(default)] + template : Vec