integration-module/crates/api-grub/src/jitter.rs

221 lines
12 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

use std::collections::{HashMap, HashSet};
use integr_structs::api::v3::PrometheusMetricsExtended;
use reqwest::Client;
use lazy_static::lazy_static;
use tracing::{error, info, warn};
use serde_json::{Value, from_str};
use crate::export::Exporter;
lazy_static! {
static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences");
static ref USERS_ENDPOINT: String = String::from("/api/v1/participants/");
}
// conferences ids
type Conferences = HashSet<(String, String)>;
// hash map {CONFERENCE_ID - JITTER_VALUE}
type OutputConferences = HashMap<(String, String), u32>;
// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}}
type Jitter = HashMap<(String, String), OutputConferences>;
// to handle async http requests
#[derive(Debug)]
struct Requester {
base: String,
api_key: String,
client: Client,
}
impl Requester {
pub fn new() -> anyhow::Result<Self> {
Ok(Self {
base: String::from("https://demo.vcs.vinteo.dev"),
api_key: std::env::var("VINTEO_API_KEY").unwrap(),
client: Client::builder().user_agent("api-grub").build()?,
})
}
pub async fn get_conferences(&self) -> anyhow::Result<Value> {
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
let req = self.client.get(url)
.timeout(tokio::time::Duration::from_secs(10))
.header("accept", "application/json")
.header("x-api-key", &self.api_key);
let resp = req.send().await;
Ok(from_str(&resp?.text().await?)?)
}
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
let mut hashset = Conferences::new();
match conferences.get("data") {
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
Some(data) => {
match data.get("conferences") {
None => return Err(anyhow::Error::msg("Invalid JSON format: no `conferences` field")),
Some(confs) => {
if let Some(confs) = confs.as_array() {
confs.iter()
.for_each(|conf_obj| {
if let Some(id) = conf_obj.get("number") {
if let Some(id) = id.as_str() {
let id = {
if let Some(desc) = conf_obj.get("description") {
match desc.as_str() {
Some(desc) => (desc.to_string(), id.to_string()),
None => (String::new(), id.to_string())
}
} else {
(String::new(), id.to_string())
}
};
hashset.insert(id);
}
}
});
} else {
return Err(anyhow::Error::msg("Invalid JSON format: `conferences` is not an array"))
}
},
}
},
}
Ok(hashset)
}
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<Jitter> {
let mut output = Jitter::new();
for conf in conferences {
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
let req = self.client.get(url)
.timeout(tokio::time::Duration::from_secs(10))
.header("accept", "application/json")
.header("x-api-key", &self.api_key);
match req.send().await {
Err(er) => error!("Cannot GET data about conference - {:?}: {}", conf, er),
Ok(resp) => {
match resp.text().await {
Err(er) => error!("Cannot convert response into text: {}", er),
Ok(resp) => {
let resp: Value = from_str(&resp)?;
if let Some(data) = resp.get("data") {
if let Some(parts) = data.get("participants") {
if let Some(parts) = parts.as_array() {
for participant in parts {
// save id
let id = {
if let Some(id) = participant.get("id") {
let id = serde_json::to_string(id)?.replace("\"", "");
if let Some(watermark) = participant.get("watermark") {
match watermark.as_str() {
Some(watermark) => (watermark.to_string(), id),
None => (String::new(), id),
}
} else {
error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf);
(String::new(), id)
}
} else {
error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf);
continue;
}
};
if let Some(params) = participant.get("params") {
let jitter = {
if let Some(jitter) = params.get("jBLen") {
// find substring
let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", "");
// string to u32
if let Ok(jitter) = temp.parse::<u32>() {
jitter
} else {
error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf);
continue;
}
} else {
error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
continue;
}
};
// hm push
output.entry(id)
.and_modify(|map| {
map.entry(conf.clone())
.and_modify(|val| *val = jitter)
.or_insert(jitter); } )
.or_insert( {
let mut hm = OutputConferences::new();
hm.insert(conf.clone(), jitter);
hm
} );
} else {
error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
}
}
}
else {
return Err(anyhow::Error::msg("Invalid JSON format: `participants` is not an array"))
}
} else {
return Err(anyhow::Error::msg("Invalid JSON format: no `participants` field"))
}
} else {
return Err(anyhow::Error::msg("Invalid JSON format: no `data` field"))
}
}
}
},
}
}
Ok(output)
}
async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended {
use integr_structs::api::v3::MetricOutputExtended;
let mut metrics = PrometheusMetricsExtended::new_empty_jitter();
// for ((name, id), confs) in jitter {
// confs.iter()
// .for_each(|((conf_name, conf_id), &value)| {todo!()});
// }
jitter.iter()
.map(|(participant, confs)| (confs.iter(), participant))
.for_each(|(conf_iter, (name, id))| {
conf_iter.for_each(|((conf_name, conf_id), &jitter)| {
let metric_id = format!("j{}{}", id, conf_id);
let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id);
let val = serde_json::to_value(jitter).unwrap_or_else(|er| {
error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er);
Value::Null
});
metrics.add(MetricOutputExtended::new_with_slices(&metric_id, "int", "Vinteo native", &desc, val));
});
});
metrics
}
}
pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
// create requester
let requester = Requester::new()?;
// get conference id's
loop {
info!("Getting conferences array ...");
let conferences = requester.get_conferences().await?;
let conferences = Requester::get_conferences_from_value(conferences).await?;
info!("Getting jitter metrics by user in each conference ...");
let jitter = requester.get_users_jitter_by_conference(conferences).await?;
info!("Casting jitter metrics into Metrics format for Prometheus ...");
let metrics = Requester::get_metrics_from_jitter(jitter).await;
if !metrics.metrics.is_empty() {
match Exporter::export_extended_metrics(metrics).await {
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
Err(er) => error!("Cannot export data: {}", er),
}
} else {
warn!("Metrics array is empty. Ignoring exporting ...");
}
// Exporter::export_extended_metrics(metrics)
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
// get users' jitter info
}