diff --git a/crates/api-grub/src/jitter.rs b/crates/api-grub/src/jitter.rs index eaa3a8b..dcdc8f2 100644 --- a/crates/api-grub/src/jitter.rs +++ b/crates/api-grub/src/jitter.rs @@ -1,9 +1,10 @@ use std::collections::{HashMap, HashSet}; +use integr_structs::api::v3::PrometheusMetricsExtended; use reqwest::Client; use lazy_static::lazy_static; -use tracing::{info, error}; +use tracing::{error, info, warn}; use serde_json::{Value, from_str}; - +use crate::export::Exporter; lazy_static! { static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences"); @@ -11,11 +12,11 @@ lazy_static! { } // conferences ids -type Conferences = HashSet; +type Conferences = HashSet<(String, String)>; // hash map {CONFERENCE_ID - JITTER_VALUE} -type OutputConferences = HashMap; +type OutputConferences = HashMap<(String, String), u32>; // hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}} -type Output = HashMap; +type Jitter = HashMap<(String, String), OutputConferences>; // to handle async http requests #[derive(Debug)] @@ -54,7 +55,17 @@ impl Requester { .for_each(|conf_obj| { if let Some(id) = conf_obj.get("number") { if let Some(id) = id.as_str() { - hashset.insert(id.to_string()); + let id = { + if let Some(desc) = conf_obj.get("description") { + match desc.as_str() { + Some(desc) => (desc.to_string(), id.to_string()), + None => (String::new(), id.to_string()) + } + } else { + (String::new(), id.to_string()) + } + }; + hashset.insert(id); } } }); @@ -67,16 +78,16 @@ impl Requester { } Ok(hashset) } - pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result { - let mut output = Output::new(); + pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result { + let mut output = Jitter::new(); for conf in conferences { - let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf); + let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1); let req = self.client.get(url) .header("accept", "application/json") .header("x-api-key", &self.api_key); match req.send().await { - Err(er) => error!("Cannot GET data about conference - {}: {}", conf, er), + Err(er) => error!("Cannot GET data about conference - {:?}: {}", conf, er), Ok(resp) => { match resp.text().await { Err(er) => error!("Cannot convert response into text: {}", er), @@ -89,9 +100,18 @@ impl Requester { // save id let id = { if let Some(id) = participant.get("id") { - serde_json::to_string(id)?.replace("\"", "") + let id = serde_json::to_string(id)?.replace("\"", ""); + if let Some(watermark) = participant.get("watermark") { + match watermark.as_str() { + Some(watermark) => (watermark.to_string(), id), + None => (String::new(), id), + } + } else { + error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf); + (String::new(), id) + } } else { - error!("No `id` field was found in participant from {} conference. Skipping ...", &conf); + error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf); continue; } }; @@ -99,19 +119,16 @@ impl Requester { let jitter = { if let Some(jitter) = params.get("jBLen") { // find substring - dbg!(&jitter); let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", ""); // string to u32 - dbg!(&temp); if let Ok(jitter) = temp.parse::() { jitter } else { - error!("Invalid type of `jBLen` field for participant {} in conference {}. Skipping ...", &id, &conf); + error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf); continue; } - } else { - error!("No `jBLen` field was found in participant {} from conference {}. Skipping ...", &id, &conf); + error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf); continue; } }; @@ -127,7 +144,7 @@ impl Requester { hm } ); } else { - error!("No `params` field in participant {} from conference {}. Skipping ...", &id, &conf); + error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf); } } } @@ -147,6 +164,28 @@ impl Requester { } Ok(output) } + async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended { + use integr_structs::api::v3::MetricOutputExtended; + let mut metrics = PrometheusMetricsExtended::new_empty_jitter(); + // for ((name, id), confs) in jitter { + // confs.iter() + // .for_each(|((conf_name, conf_id), &value)| {todo!()}); + // } + jitter.iter() + .map(|(participant, confs)| (confs.iter(), participant)) + .for_each(|(conf_iter, (name, id))| { + conf_iter.for_each(|((conf_name, conf_id), &jitter)| { + let metric_id = format!("j{}{}", id, conf_id); + let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id); + let val = serde_json::to_value(jitter).unwrap_or_else(|er| { + error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er); + Value::Null + }); + metrics.add(MetricOutputExtended::new_with_slices(&metric_id, "int", "Vinteo native", &desc, val)); + }); + }); + metrics + } } pub async fn init_grubbing_jitter() -> anyhow::Result<()> { @@ -154,14 +193,26 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> { let requester = Requester::new()?; // get conference id's loop { + info!("Getting conferences array ..."); let conferences = requester.get_conferences().await?; let conferences = Requester::get_conferences_from_value(conferences).await?; - // conferences.iter() - // .for_each(|id| { - // requester.get_users_jitter_by_conference(conference_id) - // }); + + info!("Getting jitter metrics by user in each conference ..."); let jitter = requester.get_users_jitter_by_conference(conferences).await?; - dbg!(jitter); + + info!("Casting jitter metrics into Metrics format for Prometheus ..."); + let metrics = Requester::get_metrics_from_jitter(jitter).await; + + if !metrics.metrics.is_empty() { + match Exporter::export_extended_metrics(metrics).await { + Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), + Err(er) => error!("Cannot export data: {}", er), + } + } else { + warn!("Metrics array is empty. Ignoring exporting ..."); + } + // Exporter::export_extended_metrics(metrics) + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } // get users' jitter info diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index e7984f5..349b84b 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -321,6 +321,16 @@ pub mod v3 { pub metrics: Vec, } impl PrometheusMetricsExtended { + pub fn new_empty_jitter() -> Self { + Self { + service_name : "zvks".to_owned(), + endpoint_name : "jitter".to_owned(), + metrics : Vec::new(), + } + } + pub fn add(&mut self, metric: MetricOutputExtended) { + self.metrics.push(metric); + } pub async fn new_zvks(metrics: Vec) -> Self { Self { service_name : "zvks".to_owned(),