From 0d449468de001a85ec47f774bc5662d8439e4293 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 29 Apr 2025 11:30:56 -0400 Subject: [PATCH 1/5] jitter support --- crates/api-grub/Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/api-grub/Cargo.toml b/crates/api-grub/Cargo.toml index 87895ad..b65e031 100644 --- a/crates/api-grub/Cargo.toml +++ b/crates/api-grub/Cargo.toml @@ -26,4 +26,5 @@ rand = "0.9.0" sysinfo = "0.33.1" openssl = { version = "0.10", features = ["vendored"] } tracing-subscriber = "0.3.19" -tracing = "0.1.41" \ No newline at end of file +tracing = "0.1.41" +lazy_static = "1.5.0" From 97369e6452f2d11e30a3b086d81586a9c96b5b52 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 29 Apr 2025 11:31:10 -0400 Subject: [PATCH 2/5] jitter support x2 --- crates/api-grub/src/jitter.rs | 168 ++++++++++++++++++++++++++++++++++ crates/api-grub/src/main.rs | 28 +++--- 2 files changed, 184 insertions(+), 12 deletions(-) create mode 100644 crates/api-grub/src/jitter.rs diff --git a/crates/api-grub/src/jitter.rs b/crates/api-grub/src/jitter.rs new file mode 100644 index 0000000..7c022ca --- /dev/null +++ b/crates/api-grub/src/jitter.rs @@ -0,0 +1,168 @@ +use std::collections::{HashMap, HashSet}; +use reqwest::Client; +use lazy_static::lazy_static; +use tracing::{info, error}; +use serde_json::{Value, from_str}; + + +lazy_static! { + static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences"); + static ref USERS_ENDPOINT: String = String::from("/api/v1/participants/"); +} + +// conferences ids +type Conferences = HashSet; +// hash map {CONFERENCE_ID - JITTER_VALUE} +type OutputConferences = HashMap; +// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}} +type Output = HashMap; + +// to handle async http requests +#[derive(Debug)] +struct Requester { + base: String, + api_key: String, + client: Client, +} + +impl Requester { + pub fn new() -> anyhow::Result { + Ok(Self { + base: String::from("https://demo.vcs.vinteo.dev"), + api_key: std::env::var("VINTEO_API_KEY").unwrap(), + client: Client::builder().user_agent("api-grub").build()?, + }) + } + pub async fn get_conferences(&self) -> anyhow::Result { + let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT); + let req = self.client.get(url) + .header("accept", "application/json") + .header("x-api-key", &self.api_key); + let resp = req.send().await; + Ok(from_str(&resp?.text().await?)?) + } + pub async fn get_conferences_from_value(mut conferences: Value) -> anyhow::Result { + let mut hashset = Conferences::new(); + match conferences.get("data") { + None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")), + Some(data) => { + match data.get("conferences") { + None => return Err(anyhow::Error::msg("Invalid JSON format: no `conferences` field")), + Some(confs) => { + if let Some(confs) = confs.as_array() { + confs.iter() + .for_each(|conf_obj| { + if let Some(id) = conf_obj.get("number") { + if let Some(id) = id.as_str() { + hashset.insert(id.to_string()); + } + } + }); + } else { + return Err(anyhow::Error::msg("Invalid JSON format: `conferences` is not an array")) + } + }, + } + }, + } + Ok(hashset) + } + pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result { + let mut output = Output::new(); + for conf in conferences { + let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf); + let req = self.client.get(url) + .header("accept", "application/json") + .header("x-api-key", &self.api_key); + + match req.send().await { + Err(er) => error!("Cannot GET data about conference - {}: {}", conf, er), + Ok(resp) => { + match resp.text().await { + Err(er) => error!("Cannot convert response into text: {}", er), + Ok(resp) => { + let resp: Value = from_str(&resp)?; + if let Some(data) = resp.get("data") { + if let Some(parts) = data.get("participants") { + if let Some(parts) = parts.as_array() { + for participant in parts { + // save id + let id = { + if let Some(id) = participant.get("id") { + serde_json::to_string(id)?.replace("\"", "") + } else { + error!("No `id` field was found in participant from {} conference. Skipping ...", &conf); + continue; + } + }; + if let Some(params) = participant.get("params") { + let jitter = { + if let Some(jitter) = params.get("jBLen") { + // find substring + dbg!(&jitter); + let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", ""); + // string to u32 + dbg!(&temp); + if let Ok(jitter) = temp.parse::() { + jitter + } else { + error!("Invalid type of `jBLen` field for participant {} in conference {}. Skipping ...", &id, &conf); + continue; + } + + } else { + error!("No `jBLen` field was found in participant {} from conference {}. Skipping ...", &id, &conf); + continue; + } + }; + // hm push + output.entry(id) + .and_modify(|map| { + map.entry(conf.clone()) + .and_modify(|val| *val = jitter) + .or_insert(jitter); } ) + .or_insert( { + let mut hm = OutputConferences::new(); + hm.insert(conf.clone(), jitter); + hm + } ); + } else { + error!("No `params` field in participant {} from conference {}. Skipping ...", &id, &conf); + } + } + } + else { + return Err(anyhow::Error::msg("Invalid JSON format: `participants` is not an array")) + } + } else { + return Err(anyhow::Error::msg("Invalid JSON format: no `participants` field")) + } + } else { + return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")) + } + } + } + }, + } + } + Ok(output) + } +} + +pub async fn init_grubbing_jitter() -> anyhow::Result<()> { + // create requester + let requester = Requester::new()?; + // get conference id's + loop { + let conferences = requester.get_conferences().await?; + let conferences = Requester::get_conferences_from_value(conferences).await?; + // conferences.iter() + // .for_each(|id| { + // requester.get_users_jitter_by_conference(conference_id) + // }); + let jitter = requester.get_users_jitter_by_conference(conferences).await?; + dbg!(jitter); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + // get users' jitter info +} \ No newline at end of file diff --git a/crates/api-grub/src/main.rs b/crates/api-grub/src/main.rs index 34e2a9e..bbb05d0 100644 --- a/crates/api-grub/src/main.rs +++ b/crates/api-grub/src/main.rs @@ -4,6 +4,7 @@ mod logger; mod json; mod export; mod monitoring; +mod jitter; use anyhow::Result; use integr_structs::api::v3::Config; @@ -12,7 +13,10 @@ use config::{pull_local_config, init_config_grub_mechanism}; use net::init_api_grub_mechanism; use tokio::sync::mpsc; use tracing::{error, info, warn}; +// ENODE_MONITORING_IP use monitoring::get_metrics_from_monitoring; +// VINTEO_API_KEY +use jitter::init_grubbing_jitter; #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()>{ @@ -33,23 +37,23 @@ async fn main() -> Result<()>{ } }); let event_grub = tokio::spawn(async move { + // GRAB USING eNODE.MONITORING API GATEWAY if std::env::var("ENODE_MONITORING_IP").is_ok() { match get_metrics_from_monitoring(0, 5).await { - Ok(_) => { - info!("Grabing (eNODE.Monitoring) task deinitialized"); - }, - Err(er) => { - error!("Grabing task returned an error : {}", er); - }, + Ok(_) => info!("Grabing (eNODE.Monitoring) task de-initialized"), + Err(er) => error!("Grabing task returned an error : {}", er), } + // JITTER NATIVE GRAB TASK + } else if std::env::var("VINTEO_API_KEY").is_ok() { + match init_grubbing_jitter().await { + Ok(_) => info!("Grabing (Vinteo - Jitter native) task de-initialized"), + Err(er) => error!("Jitter grabing mechanism crushed : {}", er), + } + // NATIVE GRAB TASK USING `config_api.json` } else { match init_api_grub_mechanism(config, &mut rx).await { - Ok(_) => { - info!("Grabing task deinitialized"); - }, - Err(er) => { - error!("Grabing task returned an error : {}", er); - }, + Ok(_) => info!("Grabing task de-initialized"), + Err(er) => error!("Grabing task returned an error : {}", er), } } }); From b58c58b165d8823e68e3019af8d03520ae14db2b Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 29 Apr 2025 11:32:56 -0400 Subject: [PATCH 3/5] redactor mut --- crates/api-grub/src/jitter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/api-grub/src/jitter.rs b/crates/api-grub/src/jitter.rs index 7c022ca..eaa3a8b 100644 --- a/crates/api-grub/src/jitter.rs +++ b/crates/api-grub/src/jitter.rs @@ -41,7 +41,7 @@ impl Requester { let resp = req.send().await; Ok(from_str(&resp?.text().await?)?) } - pub async fn get_conferences_from_value(mut conferences: Value) -> anyhow::Result { + pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result { let mut hashset = Conferences::new(); match conferences.get("data") { None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")), From 0ea8213346c62ed3212eafdee86cc050a00b928c Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 29 Apr 2025 11:34:29 -0400 Subject: [PATCH 4/5] .env.example update --- .env.example | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.env.example b/.env.example index f17f574..d90a1df 100644 --- a/.env.example +++ b/.env.example @@ -9,6 +9,9 @@ DB_DBNAME = "db_name"1 # Prometheus-Exporter info EXPORTER_URL = "http(s)://ip.ip.ip.ip:port" +# VINTEO Jitter puller (needed to init Jitter native grab) +VINTEO_API_KEY = "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711" + # Status Model API support # > if exists, ignore `EXPORTER_URL` var STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input" From 12ee29d69925346b83584564f7ff0f5c5e19f0e7 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 30 Apr 2025 06:39:17 -0400 Subject: [PATCH 5/5] jitter finish --- crates/api-grub/src/jitter.rs | 97 ++++++++++++++++++++++++-------- crates/integr-structs/src/api.rs | 10 ++++ 2 files changed, 84 insertions(+), 23 deletions(-) diff --git a/crates/api-grub/src/jitter.rs b/crates/api-grub/src/jitter.rs index eaa3a8b..dcdc8f2 100644 --- a/crates/api-grub/src/jitter.rs +++ b/crates/api-grub/src/jitter.rs @@ -1,9 +1,10 @@ use std::collections::{HashMap, HashSet}; +use integr_structs::api::v3::PrometheusMetricsExtended; use reqwest::Client; use lazy_static::lazy_static; -use tracing::{info, error}; +use tracing::{error, info, warn}; use serde_json::{Value, from_str}; - +use crate::export::Exporter; lazy_static! { static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences"); @@ -11,11 +12,11 @@ lazy_static! { } // conferences ids -type Conferences = HashSet; +type Conferences = HashSet<(String, String)>; // hash map {CONFERENCE_ID - JITTER_VALUE} -type OutputConferences = HashMap; +type OutputConferences = HashMap<(String, String), u32>; // hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}} -type Output = HashMap; +type Jitter = HashMap<(String, String), OutputConferences>; // to handle async http requests #[derive(Debug)] @@ -54,7 +55,17 @@ impl Requester { .for_each(|conf_obj| { if let Some(id) = conf_obj.get("number") { if let Some(id) = id.as_str() { - hashset.insert(id.to_string()); + let id = { + if let Some(desc) = conf_obj.get("description") { + match desc.as_str() { + Some(desc) => (desc.to_string(), id.to_string()), + None => (String::new(), id.to_string()) + } + } else { + (String::new(), id.to_string()) + } + }; + hashset.insert(id); } } }); @@ -67,16 +78,16 @@ impl Requester { } Ok(hashset) } - pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result { - let mut output = Output::new(); + pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result { + let mut output = Jitter::new(); for conf in conferences { - let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf); + let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1); let req = self.client.get(url) .header("accept", "application/json") .header("x-api-key", &self.api_key); match req.send().await { - Err(er) => error!("Cannot GET data about conference - {}: {}", conf, er), + Err(er) => error!("Cannot GET data about conference - {:?}: {}", conf, er), Ok(resp) => { match resp.text().await { Err(er) => error!("Cannot convert response into text: {}", er), @@ -89,9 +100,18 @@ impl Requester { // save id let id = { if let Some(id) = participant.get("id") { - serde_json::to_string(id)?.replace("\"", "") + let id = serde_json::to_string(id)?.replace("\"", ""); + if let Some(watermark) = participant.get("watermark") { + match watermark.as_str() { + Some(watermark) => (watermark.to_string(), id), + None => (String::new(), id), + } + } else { + error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf); + (String::new(), id) + } } else { - error!("No `id` field was found in participant from {} conference. Skipping ...", &conf); + error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf); continue; } }; @@ -99,19 +119,16 @@ impl Requester { let jitter = { if let Some(jitter) = params.get("jBLen") { // find substring - dbg!(&jitter); let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", ""); // string to u32 - dbg!(&temp); if let Ok(jitter) = temp.parse::() { jitter } else { - error!("Invalid type of `jBLen` field for participant {} in conference {}. Skipping ...", &id, &conf); + error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf); continue; } - } else { - error!("No `jBLen` field was found in participant {} from conference {}. Skipping ...", &id, &conf); + error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf); continue; } }; @@ -127,7 +144,7 @@ impl Requester { hm } ); } else { - error!("No `params` field in participant {} from conference {}. Skipping ...", &id, &conf); + error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf); } } } @@ -147,6 +164,28 @@ impl Requester { } Ok(output) } + async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended { + use integr_structs::api::v3::MetricOutputExtended; + let mut metrics = PrometheusMetricsExtended::new_empty_jitter(); + // for ((name, id), confs) in jitter { + // confs.iter() + // .for_each(|((conf_name, conf_id), &value)| {todo!()}); + // } + jitter.iter() + .map(|(participant, confs)| (confs.iter(), participant)) + .for_each(|(conf_iter, (name, id))| { + conf_iter.for_each(|((conf_name, conf_id), &jitter)| { + let metric_id = format!("j{}{}", id, conf_id); + let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id); + let val = serde_json::to_value(jitter).unwrap_or_else(|er| { + error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er); + Value::Null + }); + metrics.add(MetricOutputExtended::new_with_slices(&metric_id, "int", "Vinteo native", &desc, val)); + }); + }); + metrics + } } pub async fn init_grubbing_jitter() -> anyhow::Result<()> { @@ -154,14 +193,26 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> { let requester = Requester::new()?; // get conference id's loop { + info!("Getting conferences array ..."); let conferences = requester.get_conferences().await?; let conferences = Requester::get_conferences_from_value(conferences).await?; - // conferences.iter() - // .for_each(|id| { - // requester.get_users_jitter_by_conference(conference_id) - // }); + + info!("Getting jitter metrics by user in each conference ..."); let jitter = requester.get_users_jitter_by_conference(conferences).await?; - dbg!(jitter); + + info!("Casting jitter metrics into Metrics format for Prometheus ..."); + let metrics = Requester::get_metrics_from_jitter(jitter).await; + + if !metrics.metrics.is_empty() { + match Exporter::export_extended_metrics(metrics).await { + Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), + Err(er) => error!("Cannot export data: {}", er), + } + } else { + warn!("Metrics array is empty. Ignoring exporting ..."); + } + // Exporter::export_extended_metrics(metrics) + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } // get users' jitter info diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index e7984f5..349b84b 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -321,6 +321,16 @@ pub mod v3 { pub metrics: Vec, } impl PrometheusMetricsExtended { + pub fn new_empty_jitter() -> Self { + Self { + service_name : "zvks".to_owned(), + endpoint_name : "jitter".to_owned(), + metrics : Vec::new(), + } + } + pub fn add(&mut self, metric: MetricOutputExtended) { + self.metrics.push(metric); + } pub async fn new_zvks(metrics: Vec) -> Self { Self { service_name : "zvks".to_owned(),