jitter finish
test-org/integration-module/pipeline/pr-rc This commit looks good Details

pull/27/head
prplV 2025-04-30 06:39:17 -04:00
parent 0ea8213346
commit 12ee29d699
2 changed files with 84 additions and 23 deletions

View File

@ -1,9 +1,10 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use integr_structs::api::v3::PrometheusMetricsExtended;
use reqwest::Client; use reqwest::Client;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tracing::{info, error}; use tracing::{error, info, warn};
use serde_json::{Value, from_str}; use serde_json::{Value, from_str};
use crate::export::Exporter;
lazy_static! { lazy_static! {
static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences"); static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences");
@ -11,11 +12,11 @@ lazy_static! {
} }
// conferences ids // conferences ids
type Conferences = HashSet<String>; type Conferences = HashSet<(String, String)>;
// hash map {CONFERENCE_ID - JITTER_VALUE} // hash map {CONFERENCE_ID - JITTER_VALUE}
type OutputConferences = HashMap<String, u32>; type OutputConferences = HashMap<(String, String), u32>;
// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}} // hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}}
type Output = HashMap<String, OutputConferences>; type Jitter = HashMap<(String, String), OutputConferences>;
// to handle async http requests // to handle async http requests
#[derive(Debug)] #[derive(Debug)]
@ -54,7 +55,17 @@ impl Requester {
.for_each(|conf_obj| { .for_each(|conf_obj| {
if let Some(id) = conf_obj.get("number") { if let Some(id) = conf_obj.get("number") {
if let Some(id) = id.as_str() { if let Some(id) = id.as_str() {
hashset.insert(id.to_string()); let id = {
if let Some(desc) = conf_obj.get("description") {
match desc.as_str() {
Some(desc) => (desc.to_string(), id.to_string()),
None => (String::new(), id.to_string())
}
} else {
(String::new(), id.to_string())
}
};
hashset.insert(id);
} }
} }
}); });
@ -67,16 +78,16 @@ impl Requester {
} }
Ok(hashset) Ok(hashset)
} }
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<Output> { pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<Jitter> {
let mut output = Output::new(); let mut output = Jitter::new();
for conf in conferences { for conf in conferences {
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf); let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
let req = self.client.get(url) let req = self.client.get(url)
.header("accept", "application/json") .header("accept", "application/json")
.header("x-api-key", &self.api_key); .header("x-api-key", &self.api_key);
match req.send().await { match req.send().await {
Err(er) => error!("Cannot GET data about conference - {}: {}", conf, er), Err(er) => error!("Cannot GET data about conference - {:?}: {}", conf, er),
Ok(resp) => { Ok(resp) => {
match resp.text().await { match resp.text().await {
Err(er) => error!("Cannot convert response into text: {}", er), Err(er) => error!("Cannot convert response into text: {}", er),
@ -89,9 +100,18 @@ impl Requester {
// save id // save id
let id = { let id = {
if let Some(id) = participant.get("id") { if let Some(id) = participant.get("id") {
serde_json::to_string(id)?.replace("\"", "") let id = serde_json::to_string(id)?.replace("\"", "");
if let Some(watermark) = participant.get("watermark") {
match watermark.as_str() {
Some(watermark) => (watermark.to_string(), id),
None => (String::new(), id),
}
} else { } else {
error!("No `id` field was found in participant from {} conference. Skipping ...", &conf); error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf);
(String::new(), id)
}
} else {
error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf);
continue; continue;
} }
}; };
@ -99,19 +119,16 @@ impl Requester {
let jitter = { let jitter = {
if let Some(jitter) = params.get("jBLen") { if let Some(jitter) = params.get("jBLen") {
// find substring // find substring
dbg!(&jitter);
let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", ""); let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", "");
// string to u32 // string to u32
dbg!(&temp);
if let Ok(jitter) = temp.parse::<u32>() { if let Ok(jitter) = temp.parse::<u32>() {
jitter jitter
} else { } else {
error!("Invalid type of `jBLen` field for participant {} in conference {}. Skipping ...", &id, &conf); error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf);
continue; continue;
} }
} else { } else {
error!("No `jBLen` field was found in participant {} from conference {}. Skipping ...", &id, &conf); error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
continue; continue;
} }
}; };
@ -127,7 +144,7 @@ impl Requester {
hm hm
} ); } );
} else { } else {
error!("No `params` field in participant {} from conference {}. Skipping ...", &id, &conf); error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
} }
} }
} }
@ -147,6 +164,28 @@ impl Requester {
} }
Ok(output) Ok(output)
} }
async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended {
use integr_structs::api::v3::MetricOutputExtended;
let mut metrics = PrometheusMetricsExtended::new_empty_jitter();
// for ((name, id), confs) in jitter {
// confs.iter()
// .for_each(|((conf_name, conf_id), &value)| {todo!()});
// }
jitter.iter()
.map(|(participant, confs)| (confs.iter(), participant))
.for_each(|(conf_iter, (name, id))| {
conf_iter.for_each(|((conf_name, conf_id), &jitter)| {
let metric_id = format!("j{}{}", id, conf_id);
let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id);
let val = serde_json::to_value(jitter).unwrap_or_else(|er| {
error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er);
Value::Null
});
metrics.add(MetricOutputExtended::new_with_slices(&metric_id, "int", "Vinteo native", &desc, val));
});
});
metrics
}
} }
pub async fn init_grubbing_jitter() -> anyhow::Result<()> { pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
@ -154,14 +193,26 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
let requester = Requester::new()?; let requester = Requester::new()?;
// get conference id's // get conference id's
loop { loop {
info!("Getting conferences array ...");
let conferences = requester.get_conferences().await?; let conferences = requester.get_conferences().await?;
let conferences = Requester::get_conferences_from_value(conferences).await?; let conferences = Requester::get_conferences_from_value(conferences).await?;
// conferences.iter()
// .for_each(|id| { info!("Getting jitter metrics by user in each conference ...");
// requester.get_users_jitter_by_conference(conference_id)
// });
let jitter = requester.get_users_jitter_by_conference(conferences).await?; let jitter = requester.get_users_jitter_by_conference(conferences).await?;
dbg!(jitter);
info!("Casting jitter metrics into Metrics format for Prometheus ...");
let metrics = Requester::get_metrics_from_jitter(jitter).await;
if !metrics.metrics.is_empty() {
match Exporter::export_extended_metrics(metrics).await {
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
Err(er) => error!("Cannot export data: {}", er),
}
} else {
warn!("Metrics array is empty. Ignoring exporting ...");
}
// Exporter::export_extended_metrics(metrics)
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
} }
// get users' jitter info // get users' jitter info

View File

@ -321,6 +321,16 @@ pub mod v3 {
pub metrics: Vec<MetricOutputExtended>, pub metrics: Vec<MetricOutputExtended>,
} }
impl PrometheusMetricsExtended { impl PrometheusMetricsExtended {
pub fn new_empty_jitter() -> Self {
Self {
service_name : "zvks".to_owned(),
endpoint_name : "jitter".to_owned(),
metrics : Vec::new(),
}
}
pub fn add(&mut self, metric: MetricOutputExtended) {
self.metrics.push(metric);
}
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self { pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
Self { Self {
service_name : "zvks".to_owned(), service_name : "zvks".to_owned(),