metrics vinteo native update

pull/36/head
prplV 2025-05-26 09:19:57 -04:00
parent f4b560a454
commit af604c55a6
2 changed files with 168 additions and 101 deletions

View File

@ -28,3 +28,5 @@ openssl = { version = "0.10", features = ["vendored"] }
tracing-subscriber = "0.3.19" tracing-subscriber = "0.3.19"
tracing = "0.1.41" tracing = "0.1.41"
lazy_static = "1.5.0" lazy_static = "1.5.0"
futures = "0.3.31"
async-stream = "0.3.6"

View File

@ -1,10 +1,14 @@
use std::collections::{HashMap, HashSet}; use std::{collections::{HashMap, HashSet}, future::Future};
use integr_structs::api::v3::PrometheusMetricsExtended; use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended};
use std::sync::Arc;
use reqwest::Client; use reqwest::Client;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use std::pin::Pin;
use serde_json::{Value, from_str}; use serde_json::{Value, from_str};
use crate::export::Exporter; use crate::export::Exporter;
use async_stream::stream;
use futures::{future, pin_mut, stream::Stream, StreamExt};
lazy_static! { lazy_static! {
static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences"); static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences");
@ -13,10 +17,35 @@ lazy_static! {
// conferences ids // conferences ids
type Conferences = HashSet<(String, String)>; type Conferences = HashSet<(String, String)>;
// hash map {CONFERENCE_ID - JITTER_VALUE} // all participants per each conference
type OutputConferences = HashMap<(String, String), u32>; // type AllConferencesInfo = Vec<Value>;
// hash map {CONFERENCE_ID - CONFERENCE_VALUE}
type OutputConferences = HashMap<(String, String), Value>;
// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}} // hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}}
type Jitter = HashMap<(String, String), OutputConferences>; // type Jitter = HashMap<(String, String), OutputConferences>;
#[derive(Debug, Clone)]
struct ConferencesDigital(OutputConferences);
impl ConferencesDigital {
pub fn new(item: OutputConferences) -> Self {
Self(item)
}
pub fn go_across(self) -> impl Stream<Item = ((String, String), Arc<Value>)> {
stream! {
for (key, value) in self.0 {
info!("Conference {} ({}) is processing ...", key.0, key.1);
yield (key, Arc::new(value));
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
pub fn clone_confs(&self) -> OutputConferences {
self.0.clone()
}
}
// to handle async http requests // to handle async http requests
#[derive(Debug)] #[derive(Debug)]
@ -79,8 +108,8 @@ impl Requester {
} }
Ok(hashset) Ok(hashset)
} }
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<Jitter> { pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<OutputConferences> {
let mut output = Jitter::new(); let mut output = OutputConferences::new();
for conf in conferences { for conf in conferences {
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1); let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
let req = self.client.get(url) let req = self.client.get(url)
@ -95,70 +124,7 @@ impl Requester {
Err(er) => error!("Cannot convert response into text: {}", er), Err(er) => error!("Cannot convert response into text: {}", er),
Ok(resp) => { Ok(resp) => {
let resp: Value = from_str(&resp)?; let resp: Value = from_str(&resp)?;
if let Some(data) = resp.get("data") { output.entry(conf).or_insert(resp);
if let Some(parts) = data.get("participants") {
if let Some(parts) = parts.as_array() {
for participant in parts {
// save id
let id = {
if let Some(id) = participant.get("id") {
let id = serde_json::to_string(id)?.replace("\"", "");
if let Some(watermark) = participant.get("watermark") {
match watermark.as_str() {
Some(watermark) => (watermark.to_string(), id),
None => (String::new(), id),
}
} else {
error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf);
(String::new(), id)
}
} else {
error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf);
continue;
}
};
if let Some(params) = participant.get("params") {
let jitter = {
if let Some(jitter) = params.get("jBLen") {
// find substring
let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", "");
// string to u32
if let Ok(jitter) = temp.parse::<u32>() {
jitter
} else {
error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf);
continue;
}
} else {
error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
continue;
}
};
// hm push
output.entry(id)
.and_modify(|map| {
map.entry(conf.clone())
.and_modify(|val| *val = jitter)
.or_insert(jitter); } )
.or_insert( {
let mut hm = OutputConferences::new();
hm.insert(conf.clone(), jitter);
hm
} );
} else {
error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
}
}
}
else {
return Err(anyhow::Error::msg("Invalid JSON format: `participants` is not an array"))
}
} else {
return Err(anyhow::Error::msg("Invalid JSON format: no `participants` field"))
}
} else {
return Err(anyhow::Error::msg("Invalid JSON format: no `data` field"))
}
} }
} }
}, },
@ -166,28 +132,72 @@ impl Requester {
} }
Ok(output) Ok(output)
} }
async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended { }
use integr_structs::api::v3::MetricOutputExtended;
let mut metrics = PrometheusMetricsExtended::new_empty_jitter();
// for ((name, id), confs) in jitter { fn across_participants(conf: &Value) -> impl Stream<Item = Arc<Value>> + '_ {
// confs.iter() let participants_iter = conf
// .for_each(|((conf_name, conf_id), &value)| {todo!()}); .get("data")
// } .and_then(|data| data.get("participants"))
jitter.iter() .and_then(|participants| participants.as_array())
.map(|(participant, confs)| (confs.iter(), participant)) .into_iter()
.for_each(|(conf_iter, (name, id))| { .flatten()
conf_iter.for_each(|((conf_name, conf_id), &jitter)| { .cloned();
let metric_id = format!("j{}{}", id, conf_id);
let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id); stream! {
let val = serde_json::to_value(jitter).unwrap_or_else(|er| { for participant in participants_iter {
error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er); yield Arc::new(participant);
Value::Null }
}); }
metrics.add(MetricOutputExtended::new_with_slices(&metric_id, &metric_id,"int", "Vinteo native", &desc, None, None,val));
});
});
metrics
} }
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<&str>) -> Vec<Option<MetricOutputExtended>> {
let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new();
for target in targets {
futures.push(Box::pin(extract_from_participant(
parts.clone(),
target,
conf_id.clone(),
conf_desc.clone()
)));
}
future::join_all(futures).await
}
// GET participants/{conference_id} data.total
async fn extract_total_participants(conferences : Arc<Value>) -> Value {
if let Some(total) = conferences
.get("data")
.and_then(|data| data.get("total")) {
return total.clone()
}
Value::Null
}
// GET participants/{conference_id} data.participants[].isAnonymous
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>) -> Option<MetricOutputExtended> {
if let Some(value) = participant.get("params")
.and_then(|params| params.get(target)) {
return Some(
MetricOutputExtended::new_with_slices(
target,
target,
"type",
"Vinteo native",
format!(
"Значение параметра `{}` для пользователя {} ({}) в конференции {} ({})",
target,
participant.get("watermark").unwrap_or_else(|| &Value::Null),
participant.get("id").unwrap_or_else(|| &Value::Null),
conf_desc,
conf_id
).as_ref(),
Some(18),
Some("module$12".to_string()),
value.clone()
)
)
}
None
} }
pub async fn init_grubbing_jitter() -> anyhow::Result<()> { pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
@ -200,19 +210,74 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
let conferences = Requester::get_conferences_from_value(conferences).await?; let conferences = Requester::get_conferences_from_value(conferences).await?;
info!("Getting jitter metrics by user in each conference ..."); info!("Getting jitter metrics by user in each conference ...");
let jitter = requester.get_users_jitter_by_conference(conferences).await?; let confs = requester.get_users_jitter_by_conference(conferences).await?;
info!("Casting jitter metrics into Metrics format for Prometheus ..."); info!("Initializing extraction mechanism ...");
let metrics = Requester::get_metrics_from_jitter(jitter).await; let conferences = ConferencesDigital::new(confs);
let full_iterable = Arc::new(conferences.clone_confs());
let extracted_conference = conferences.go_across();
let mut buffer: Vec<PrometheusMetricsExtended> = Vec::new();
pin_mut!(extracted_conference);
if !metrics.metrics.is_empty() { while let Some(((id, desc), item)) = extracted_conference.next().await {
match Exporter::export_extended_metrics(metrics).await { let parts_stream = across_participants(&item);
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), pin_mut!(parts_stream);
Err(er) => error!("Cannot export data: {}", er),
let total_participants = MetricOutputExtended::new_with_slices(
format!("TotalParticipantsIn{}", id).as_ref(),
format!("TotalParticipantsIn{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество участников в конференции {}", &desc).as_ref(),
Some(18),
Some(String::from("module$12")),
extract_total_participants(item.clone()).await
);
// dbg!(total_participants);
while let Some(participant) = parts_stream.next().await {
let metrics = gather_futures(
participant,
Arc::from(id.as_ref()),
Arc::from(desc.as_ref()),
vec![
"txBitrate",
"rxBitrate",
"txFPS",
"rxFPS",
"rxLost",
"txLost",
"jBLen",
"txLostPercent",
"rxLostPercent",
"txH239Bitrate",
"rxH239Bitrate",
"txVideoCodec",
"rxVideoCodec",
"txAudioCodec",
"rxAudioCodec",
"txResolution",
"rxResolution",
"txH239Resolution",
"rxH239Resolution",
"duration",
],
).await;
dbg!(metrics);
} }
} else {
warn!("Metrics array is empty. Ignoring exporting ...");
} }
// let metrics = Requester::get_metrics_from_jitter(jitter).await;
// if !metrics.metrics.is_empty() {
// match Exporter::export_extended_metrics(metrics).await {
// Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
// Err(er) => error!("Cannot export data: {}", er),
// }
// } else {
// warn!("Metrics array is empty. Ignoring exporting ...");
// }
// Exporter::export_extended_metrics(metrics) // Exporter::export_extended_metrics(metrics)
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;