From af604c55a6342de899e8747eba07eeea69a91a41 Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 26 May 2025 09:19:57 -0400 Subject: [PATCH 1/3] metrics vinteo native update --- crates/api-grub/Cargo.toml | 2 + crates/api-grub/src/jitter.rs | 267 +++++++++++++++++++++------------- 2 files changed, 168 insertions(+), 101 deletions(-) diff --git a/crates/api-grub/Cargo.toml b/crates/api-grub/Cargo.toml index 17e7806..bd5d13e 100644 --- a/crates/api-grub/Cargo.toml +++ b/crates/api-grub/Cargo.toml @@ -28,3 +28,5 @@ openssl = { version = "0.10", features = ["vendored"] } tracing-subscriber = "0.3.19" tracing = "0.1.41" lazy_static = "1.5.0" +futures = "0.3.31" +async-stream = "0.3.6" diff --git a/crates/api-grub/src/jitter.rs b/crates/api-grub/src/jitter.rs index 9b0cbe6..1ab47c7 100644 --- a/crates/api-grub/src/jitter.rs +++ b/crates/api-grub/src/jitter.rs @@ -1,10 +1,14 @@ -use std::collections::{HashMap, HashSet}; -use integr_structs::api::v3::PrometheusMetricsExtended; +use std::{collections::{HashMap, HashSet}, future::Future}; +use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended}; +use std::sync::Arc; use reqwest::Client; use lazy_static::lazy_static; use tracing::{error, info, warn}; +use std::pin::Pin; use serde_json::{Value, from_str}; use crate::export::Exporter; +use async_stream::stream; +use futures::{future, pin_mut, stream::Stream, StreamExt}; lazy_static! { static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences"); @@ -13,10 +17,35 @@ lazy_static! { // conferences ids type Conferences = HashSet<(String, String)>; -// hash map {CONFERENCE_ID - JITTER_VALUE} -type OutputConferences = HashMap<(String, String), u32>; +// all participants per each conference +// type AllConferencesInfo = Vec; +// hash map {CONFERENCE_ID - CONFERENCE_VALUE} +type OutputConferences = HashMap<(String, String), Value>; // hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}} -type Jitter = HashMap<(String, String), OutputConferences>; +// type Jitter = HashMap<(String, String), OutputConferences>; + +#[derive(Debug, Clone)] +struct ConferencesDigital(OutputConferences); + +impl ConferencesDigital { + pub fn new(item: OutputConferences) -> Self { + Self(item) + } + pub fn go_across(self) -> impl Stream)> { + stream! { + for (key, value) in self.0 { + info!("Conference {} ({}) is processing ...", key.0, key.1); + yield (key, Arc::new(value)); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + } + } + pub fn clone_confs(&self) -> OutputConferences { + self.0.clone() + } + +} + // to handle async http requests #[derive(Debug)] @@ -79,8 +108,8 @@ impl Requester { } Ok(hashset) } - pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result { - let mut output = Jitter::new(); + pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result { + let mut output = OutputConferences::new(); for conf in conferences { let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1); let req = self.client.get(url) @@ -95,70 +124,7 @@ impl Requester { Err(er) => error!("Cannot convert response into text: {}", er), Ok(resp) => { let resp: Value = from_str(&resp)?; - if let Some(data) = resp.get("data") { - if let Some(parts) = data.get("participants") { - if let Some(parts) = parts.as_array() { - for participant in parts { - // save id - let id = { - if let Some(id) = participant.get("id") { - let id = serde_json::to_string(id)?.replace("\"", ""); - if let Some(watermark) = participant.get("watermark") { - match watermark.as_str() { - Some(watermark) => (watermark.to_string(), id), - None => (String::new(), id), - } - } else { - error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf); - (String::new(), id) - } - } else { - error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf); - continue; - } - }; - if let Some(params) = participant.get("params") { - let jitter = { - if let Some(jitter) = params.get("jBLen") { - // find substring - let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", ""); - // string to u32 - if let Ok(jitter) = temp.parse::() { - jitter - } else { - error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf); - continue; - } - } else { - error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf); - continue; - } - }; - // hm push - output.entry(id) - .and_modify(|map| { - map.entry(conf.clone()) - .and_modify(|val| *val = jitter) - .or_insert(jitter); } ) - .or_insert( { - let mut hm = OutputConferences::new(); - hm.insert(conf.clone(), jitter); - hm - } ); - } else { - error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf); - } - } - } - else { - return Err(anyhow::Error::msg("Invalid JSON format: `participants` is not an array")) - } - } else { - return Err(anyhow::Error::msg("Invalid JSON format: no `participants` field")) - } - } else { - return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")) - } + output.entry(conf).or_insert(resp); } } }, @@ -166,28 +132,72 @@ impl Requester { } Ok(output) } - async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended { - use integr_structs::api::v3::MetricOutputExtended; - let mut metrics = PrometheusMetricsExtended::new_empty_jitter(); - // for ((name, id), confs) in jitter { - // confs.iter() - // .for_each(|((conf_name, conf_id), &value)| {todo!()}); - // } - jitter.iter() - .map(|(participant, confs)| (confs.iter(), participant)) - .for_each(|(conf_iter, (name, id))| { - conf_iter.for_each(|((conf_name, conf_id), &jitter)| { - let metric_id = format!("j{}{}", id, conf_id); - let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id); - let val = serde_json::to_value(jitter).unwrap_or_else(|er| { - error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er); - Value::Null - }); - metrics.add(MetricOutputExtended::new_with_slices(&metric_id, &metric_id,"int", "Vinteo native", &desc, None, None,val)); - }); - }); - metrics +} + + + fn across_participants(conf: &Value) -> impl Stream> + '_ { + let participants_iter = conf + .get("data") + .and_then(|data| data.get("participants")) + .and_then(|participants| participants.as_array()) + .into_iter() + .flatten() + .cloned(); + + stream! { + for participant in participants_iter { + yield Arc::new(participant); + } + } } + + async fn gather_futures(parts: Arc, conf_id: Arc, conf_desc: Arc, targets: Vec<&str>) -> Vec> { + let mut futures: Vec> + Send>>> = Vec::new(); + for target in targets { + futures.push(Box::pin(extract_from_participant( + parts.clone(), + target, + conf_id.clone(), + conf_desc.clone() + ))); + } + future::join_all(futures).await + } + +// GET participants/{conference_id} data.total +async fn extract_total_participants(conferences : Arc) -> Value { + if let Some(total) = conferences + .get("data") + .and_then(|data| data.get("total")) { + return total.clone() + } + Value::Null +} +// GET participants/{conference_id} data.participants[].isAnonymous +async fn extract_from_participant(participant : Arc, target: &str, conf_id: Arc, conf_desc: Arc) -> Option { + if let Some(value) = participant.get("params") + .and_then(|params| params.get(target)) { + return Some( + MetricOutputExtended::new_with_slices( + target, + target, + "type", + "Vinteo native", + format!( + "Значение параметра `{}` для пользователя {} ({}) в конференции {} ({})", + target, + participant.get("watermark").unwrap_or_else(|| &Value::Null), + participant.get("id").unwrap_or_else(|| &Value::Null), + conf_desc, + conf_id + ).as_ref(), + Some(18), + Some("module$12".to_string()), + value.clone() + ) + ) + } + None } pub async fn init_grubbing_jitter() -> anyhow::Result<()> { @@ -200,19 +210,74 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> { let conferences = Requester::get_conferences_from_value(conferences).await?; info!("Getting jitter metrics by user in each conference ..."); - let jitter = requester.get_users_jitter_by_conference(conferences).await?; + let confs = requester.get_users_jitter_by_conference(conferences).await?; - info!("Casting jitter metrics into Metrics format for Prometheus ..."); - let metrics = Requester::get_metrics_from_jitter(jitter).await; + info!("Initializing extraction mechanism ..."); + let conferences = ConferencesDigital::new(confs); + let full_iterable = Arc::new(conferences.clone_confs()); + let extracted_conference = conferences.go_across(); + let mut buffer: Vec = Vec::new(); + pin_mut!(extracted_conference); - if !metrics.metrics.is_empty() { - match Exporter::export_extended_metrics(metrics).await { - Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), - Err(er) => error!("Cannot export data: {}", er), + while let Some(((id, desc), item)) = extracted_conference.next().await { + let parts_stream = across_participants(&item); + pin_mut!(parts_stream); + + let total_participants = MetricOutputExtended::new_with_slices( + format!("TotalParticipantsIn{}", id).as_ref(), + format!("TotalParticipantsIn{}", id).as_ref(), + "type", + "Vinteo Native", + format!("Общее количество участников в конференции {}", &desc).as_ref(), + Some(18), + Some(String::from("module$12")), + extract_total_participants(item.clone()).await + ); + + // dbg!(total_participants); + + while let Some(participant) = parts_stream.next().await { + let metrics = gather_futures( + participant, + Arc::from(id.as_ref()), + Arc::from(desc.as_ref()), + vec![ + "txBitrate", + "rxBitrate", + "txFPS", + "rxFPS", + "rxLost", + "txLost", + "jBLen", + "txLostPercent", + "rxLostPercent", + "txH239Bitrate", + "rxH239Bitrate", + "txVideoCodec", + "rxVideoCodec", + "txAudioCodec", + "rxAudioCodec", + "txResolution", + "rxResolution", + "txH239Resolution", + "rxH239Resolution", + "duration", + ], + ).await; + dbg!(metrics); } - } else { - warn!("Metrics array is empty. Ignoring exporting ..."); } + + // let metrics = Requester::get_metrics_from_jitter(jitter).await; + + // if !metrics.metrics.is_empty() { + // match Exporter::export_extended_metrics(metrics).await { + // Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), + // Err(er) => error!("Cannot export data: {}", er), + // } + // } else { + // warn!("Metrics array is empty. Ignoring exporting ..."); + // } // Exporter::export_extended_metrics(metrics) tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; From 7cc7c0799aebabe4d8a3429d426941d656c6f402 Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 26 May 2025 13:28:15 -0400 Subject: [PATCH 2/3] fix and finish --- crates/api-grub/src/jitter.rs | 142 ++++++++++++++++++++-------------- 1 file changed, 84 insertions(+), 58 deletions(-) diff --git a/crates/api-grub/src/jitter.rs b/crates/api-grub/src/jitter.rs index 1ab47c7..19899c4 100644 --- a/crates/api-grub/src/jitter.rs +++ b/crates/api-grub/src/jitter.rs @@ -17,12 +17,7 @@ lazy_static! { // conferences ids type Conferences = HashSet<(String, String)>; -// all participants per each conference -// type AllConferencesInfo = Vec; -// hash map {CONFERENCE_ID - CONFERENCE_VALUE} type OutputConferences = HashMap<(String, String), Value>; -// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}} -// type Jitter = HashMap<(String, String), OutputConferences>; #[derive(Debug, Clone)] struct ConferencesDigital(OutputConferences); @@ -40,10 +35,6 @@ impl ConferencesDigital { } } } - pub fn clone_confs(&self) -> OutputConferences { - self.0.clone() - } - } @@ -151,14 +142,15 @@ impl Requester { } } - async fn gather_futures(parts: Arc, conf_id: Arc, conf_desc: Arc, targets: Vec<&str>) -> Vec> { + async fn gather_futures(parts: Arc, conf_id: Arc, conf_desc: Arc, targets: Vec<(&str, &str)>) -> Vec> { let mut futures: Vec> + Send>>> = Vec::new(); - for target in targets { + for (target, description) in targets { futures.push(Box::pin(extract_from_participant( parts.clone(), target, conf_id.clone(), - conf_desc.clone() + conf_desc.clone(), + description ))); } future::join_all(futures).await @@ -173,21 +165,39 @@ async fn extract_total_participants(conferences : Arc) -> Value { } Value::Null } + +async fn extract_total_anon_participants(conferences : Arc) -> Value { + let sum = conferences + .get("data") + .and_then(|data| data.get("participants")) + .and_then(|parts| parts.as_array()) + .iter() + .flat_map(|&val| val.iter()) + .filter_map(|part| part.get("isAnonymous")) + .filter_map(|is_anon| is_anon.as_bool()) + .filter(|is_anon| *is_anon == true) + .count(); + Value::Number(sum.into()) +} + // GET participants/{conference_id} data.participants[].isAnonymous -async fn extract_from_participant(participant : Arc, target: &str, conf_id: Arc, conf_desc: Arc) -> Option { +async fn extract_from_participant(participant : Arc, target: &str, conf_id: Arc, conf_desc: Arc, description: &str) -> Option { if let Some(value) = participant.get("params") .and_then(|params| params.get(target)) { + let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown"); + let id = participant.get("id").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown"); + let metric_name = format!("{}_{}_{}", target, conf_id, id); return Some( MetricOutputExtended::new_with_slices( - target, - target, + &metric_name, + &metric_name, "type", "Vinteo native", format!( - "Значение параметра `{}` для пользователя {} ({}) в конференции {} ({})", - target, - participant.get("watermark").unwrap_or_else(|| &Value::Null), - participant.get("id").unwrap_or_else(|| &Value::Null), + "{} для пользователя {} ({}) в конференции {} ({})", + description, + name, + id, conf_desc, conf_id ).as_ref(), @@ -214,18 +224,17 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> { info!("Initializing extraction mechanism ..."); let conferences = ConferencesDigital::new(confs); - let full_iterable = Arc::new(conferences.clone_confs()); let extracted_conference = conferences.go_across(); - let mut buffer: Vec = Vec::new(); + let mut buffer: Vec = Vec::new(); pin_mut!(extracted_conference); - while let Some(((id, desc), item)) = extracted_conference.next().await { + while let Some(((desc, id), item)) = extracted_conference.next().await { let parts_stream = across_participants(&item); pin_mut!(parts_stream); let total_participants = MetricOutputExtended::new_with_slices( - format!("TotalParticipantsIn{}", id).as_ref(), - format!("TotalParticipantsIn{}", id).as_ref(), + format!("TotalParticipants_{}", id).as_ref(), + format!("TotalParticipants_{}", id).as_ref(), "type", "Vinteo Native", format!("Общее количество участников в конференции {}", &desc).as_ref(), @@ -233,53 +242,70 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> { Some(String::from("module$12")), extract_total_participants(item.clone()).await ); + // anon NON_STABLE + let total_anon_participants = MetricOutputExtended::new_with_slices( + format!("TotalAnonymousParticipants_{}", id).as_ref(), + format!("TotalAnonymousParticipants_{}", id).as_ref(), + "type", + "Vinteo Native", + format!("Общее количество анонимных участников в конференции {}", &desc).as_ref(), + Some(18), + Some(String::from("module$12")), + extract_total_anon_participants(item.clone()).await + ); + // description - // dbg!(total_participants); + buffer.push(total_participants); + buffer.push(total_anon_participants); while let Some(participant) = parts_stream.next().await { - let metrics = gather_futures( + buffer.append( + &mut gather_futures( participant, Arc::from(id.as_ref()), Arc::from(desc.as_ref()), vec![ - "txBitrate", - "rxBitrate", - "txFPS", - "rxFPS", - "rxLost", - "txLost", - "jBLen", - "txLostPercent", - "rxLostPercent", - "txH239Bitrate", - "rxH239Bitrate", - "txVideoCodec", - "rxVideoCodec", - "txAudioCodec", - "rxAudioCodec", - "txResolution", - "rxResolution", - "txH239Resolution", - "rxH239Resolution", - "duration", + ("txBitrate", "Скорость отправки данных"), + ("rxBitrate", "Скорость приёма данных"), + ("txFPS", "Количество отправляемых кадров в секунду"), + ("rxFPS", "Количество получаемых кадров в секунду"), + ("rxLost", "Количество потерянных пакетов на приёме"), + ("txLost", "Количество потерянных пакетов на отправке"), + ("jBLen", "Фазовое отклонение на цифровом канале"), + ("txLostPercent", "Процент потерянных пакетов на отправке"), + ("rxLostPercent", "Процент потерянных пакетов на приёме"), + ("txH239Bitrate", "Скорость передачи данных в дополнительном видео-потоке"), + ("rxH239Bitrate", "Скорость приёма данных в дополнительном видео-потоке"), + ("txVideoCodec", "Используемый видеокодек на отправке"), + ("rxVideoCodec", "Используемый видеокодек на приёме"), + ("txAudioCodec", "Используемый аудиокодек на отправке"), + ("rxAudioCodec", "Используемый аудиокодек на приёме"), + ("txResolution", "Разрешение передаваемого видео"), + ("rxResolution", "Разрешение принимаемого видео"), + ("txH239Resolution", "Разрешение дополнительного видео-потока"), + ("rxH239Resolution", "Разрешение дополнительного видео-потока на принимающей стороне"), + ("duration", "Длительсность сеанса (в минутах)"), ], - ).await; - dbg!(metrics); + ).await + .into_iter() + .filter(|metric| metric.is_some()) + .map(|metric| metric.unwrap() ) + .collect::>() + ); } } // let metrics = Requester::get_metrics_from_jitter(jitter).await; - // if !metrics.metrics.is_empty() { - // match Exporter::export_extended_metrics(metrics).await { - // Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), - // Err(er) => error!("Cannot export data: {}", er), - // } - // } else { - // warn!("Metrics array is empty. Ignoring exporting ..."); - // } - // Exporter::export_extended_metrics(metrics) - + if !buffer.is_empty() { + let metrics = PrometheusMetricsExtended::new_zvks(buffer).await; + match Exporter::export_extended_metrics(metrics).await { + Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), + Err(er) => error!("Cannot export data: {}", er), + } + } else { + warn!("Metrics array is empty. Ignoring exporting ..."); + } tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } // get users' jitter info From 66b66b966b9c0d850540f339f2531caa2526d591 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 27 May 2025 02:43:21 -0400 Subject: [PATCH 3/3] adding tracing --- crates/api-grub/src/jitter.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/api-grub/src/jitter.rs b/crates/api-grub/src/jitter.rs index 19899c4..a06c3d8 100644 --- a/crates/api-grub/src/jitter.rs +++ b/crates/api-grub/src/jitter.rs @@ -3,7 +3,7 @@ use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended}; use std::sync::Arc; use reqwest::Client; use lazy_static::lazy_static; -use tracing::{error, info, warn}; +use tracing::{error, info, trace, warn}; use std::pin::Pin; use serde_json::{Value, from_str}; use crate::export::Exporter; @@ -55,6 +55,7 @@ impl Requester { }) } pub async fn get_conferences(&self) -> anyhow::Result { + trace!("getting conferences list from API"); let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT); let req = self.client.get(url) .timeout(tokio::time::Duration::from_secs(10)) @@ -64,6 +65,7 @@ impl Requester { Ok(from_str(&resp?.text().await?)?) } pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result { + trace!("extracting conferences list"); let mut hashset = Conferences::new(); match conferences.get("data") { None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")), @@ -100,6 +102,7 @@ impl Requester { Ok(hashset) } pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result { + trace!("getting users list for each conference from API ..."); let mut output = OutputConferences::new(); for conf in conferences { let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1); @@ -127,6 +130,7 @@ impl Requester { fn across_participants(conf: &Value) -> impl Stream> + '_ { + trace!("fetching participants for conf and going across ..."); let participants_iter = conf .get("data") .and_then(|data| data.get("participants")) @@ -143,6 +147,7 @@ impl Requester { } async fn gather_futures(parts: Arc, conf_id: Arc, conf_desc: Arc, targets: Vec<(&str, &str)>) -> Vec> { + trace!("extracting {} measures for current participant in current conference {} ...", targets.len(), conf_id); let mut futures: Vec> + Send>>> = Vec::new(); for (target, description) in targets { futures.push(Box::pin(extract_from_participant( @@ -182,6 +187,7 @@ async fn extract_total_anon_participants(conferences : Arc) -> Value { // GET participants/{conference_id} data.participants[].isAnonymous async fn extract_from_participant(participant : Arc, target: &str, conf_id: Arc, conf_desc: Arc, description: &str) -> Option { + trace!("extracting `{}` measure for current participant ...", target); if let Some(value) = participant.get("params") .and_then(|params| params.get(target)) { let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");