use std::collections::{HashMap, HashSet}; use integr_structs::api::v3::PrometheusMetricsExtended; use reqwest::Client; use lazy_static::lazy_static; use tracing::{error, info, warn}; use serde_json::{Value, from_str}; use crate::export::Exporter; lazy_static! { static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences"); static ref USERS_ENDPOINT: String = String::from("/api/v1/participants/"); } // conferences ids type Conferences = HashSet<(String, String)>; // hash map {CONFERENCE_ID - JITTER_VALUE} type OutputConferences = HashMap<(String, String), u32>; // hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}} type Jitter = HashMap<(String, String), OutputConferences>; // to handle async http requests #[derive(Debug)] struct Requester { base: String, api_key: String, client: Client, } impl Requester { pub fn new() -> anyhow::Result { Ok(Self { base: String::from("https://demo.vcs.vinteo.dev"), api_key: std::env::var("VINTEO_API_KEY").unwrap(), client: Client::builder().user_agent("api-grub").build()?, }) } pub async fn get_conferences(&self) -> anyhow::Result { let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT); let req = self.client.get(url) .timeout(tokio::time::Duration::from_secs(10)) .header("accept", "application/json") .header("x-api-key", &self.api_key); let resp = req.send().await; Ok(from_str(&resp?.text().await?)?) } pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result { let mut hashset = Conferences::new(); match conferences.get("data") { None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")), Some(data) => { match data.get("conferences") { None => return Err(anyhow::Error::msg("Invalid JSON format: no `conferences` field")), Some(confs) => { if let Some(confs) = confs.as_array() { confs.iter() .for_each(|conf_obj| { if let Some(id) = conf_obj.get("number") { if let Some(id) = id.as_str() { let id = { if let Some(desc) = conf_obj.get("description") { match desc.as_str() { Some(desc) => (desc.to_string(), id.to_string()), None => (String::new(), id.to_string()) } } else { (String::new(), id.to_string()) } }; hashset.insert(id); } } }); } else { return Err(anyhow::Error::msg("Invalid JSON format: `conferences` is not an array")) } }, } }, } Ok(hashset) } pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result { let mut output = Jitter::new(); for conf in conferences { let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1); let req = self.client.get(url) .timeout(tokio::time::Duration::from_secs(10)) .header("accept", "application/json") .header("x-api-key", &self.api_key); match req.send().await { Err(er) => error!("Cannot GET data about conference - {:?}: {}", conf, er), Ok(resp) => { match resp.text().await { Err(er) => error!("Cannot convert response into text: {}", er), Ok(resp) => { let resp: Value = from_str(&resp)?; if let Some(data) = resp.get("data") { if let Some(parts) = data.get("participants") { if let Some(parts) = parts.as_array() { for participant in parts { // save id let id = { if let Some(id) = participant.get("id") { let id = serde_json::to_string(id)?.replace("\"", ""); if let Some(watermark) = participant.get("watermark") { match watermark.as_str() { Some(watermark) => (watermark.to_string(), id), None => (String::new(), id), } } else { error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf); (String::new(), id) } } else { error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf); continue; } }; if let Some(params) = participant.get("params") { let jitter = { if let Some(jitter) = params.get("jBLen") { // find substring let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", ""); // string to u32 if let Ok(jitter) = temp.parse::() { jitter } else { error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf); continue; } } else { error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf); continue; } }; // hm push output.entry(id) .and_modify(|map| { map.entry(conf.clone()) .and_modify(|val| *val = jitter) .or_insert(jitter); } ) .or_insert( { let mut hm = OutputConferences::new(); hm.insert(conf.clone(), jitter); hm } ); } else { error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf); } } } else { return Err(anyhow::Error::msg("Invalid JSON format: `participants` is not an array")) } } else { return Err(anyhow::Error::msg("Invalid JSON format: no `participants` field")) } } else { return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")) } } } }, } } Ok(output) } async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended { use integr_structs::api::v3::MetricOutputExtended; let mut metrics = PrometheusMetricsExtended::new_empty_jitter(); // for ((name, id), confs) in jitter { // confs.iter() // .for_each(|((conf_name, conf_id), &value)| {todo!()}); // } jitter.iter() .map(|(participant, confs)| (confs.iter(), participant)) .for_each(|(conf_iter, (name, id))| { conf_iter.for_each(|((conf_name, conf_id), &jitter)| { let metric_id = format!("j{}{}", id, conf_id); let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id); let val = serde_json::to_value(jitter).unwrap_or_else(|er| { error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er); Value::Null }); metrics.add(MetricOutputExtended::new_with_slices(&metric_id, &metric_id,"int", "Vinteo native", &desc, None, None,val)); }); }); metrics } } pub async fn init_grubbing_jitter() -> anyhow::Result<()> { // create requester let requester = Requester::new()?; // get conference id's loop { info!("Getting conferences array ..."); let conferences = requester.get_conferences().await?; let conferences = Requester::get_conferences_from_value(conferences).await?; info!("Getting jitter metrics by user in each conference ..."); let jitter = requester.get_users_jitter_by_conference(conferences).await?; info!("Casting jitter metrics into Metrics format for Prometheus ..."); let metrics = Requester::get_metrics_from_jitter(jitter).await; if !metrics.metrics.is_empty() { match Exporter::export_extended_metrics(metrics).await { Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), Err(er) => error!("Cannot export data: {}", er), } } else { warn!("Metrics array is empty. Ignoring exporting ..."); } // Exporter::export_extended_metrics(metrics) tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } // get users' jitter info }