Merge pull request 'rc' (#37) from rc into master

pull/52/head v1.0.13
deployer3000 2025-05-27 11:49:08 +03:00
commit dfe52ddc49
2 changed files with 199 additions and 100 deletions

View File

@ -28,3 +28,5 @@ openssl = { version = "0.10", features = ["vendored"] }
tracing-subscriber = "0.3.19" tracing-subscriber = "0.3.19"
tracing = "0.1.41" tracing = "0.1.41"
lazy_static = "1.5.0" lazy_static = "1.5.0"
futures = "0.3.31"
async-stream = "0.3.6"

View File

@ -1,10 +1,14 @@
use std::collections::{HashMap, HashSet}; use std::{collections::{HashMap, HashSet}, future::Future};
use integr_structs::api::v3::PrometheusMetricsExtended; use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended};
use std::sync::Arc;
use reqwest::Client; use reqwest::Client;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tracing::{error, info, warn}; use tracing::{error, info, trace, warn};
use std::pin::Pin;
use serde_json::{Value, from_str}; use serde_json::{Value, from_str};
use crate::export::Exporter; use crate::export::Exporter;
use async_stream::stream;
use futures::{future, pin_mut, stream::Stream, StreamExt};
lazy_static! { lazy_static! {
static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences"); static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences");
@ -13,10 +17,26 @@ lazy_static! {
// conferences ids // conferences ids
type Conferences = HashSet<(String, String)>; type Conferences = HashSet<(String, String)>;
// hash map {CONFERENCE_ID - JITTER_VALUE} type OutputConferences = HashMap<(String, String), Value>;
type OutputConferences = HashMap<(String, String), u32>;
// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}} #[derive(Debug, Clone)]
type Jitter = HashMap<(String, String), OutputConferences>; struct ConferencesDigital(OutputConferences);
impl ConferencesDigital {
pub fn new(item: OutputConferences) -> Self {
Self(item)
}
pub fn go_across(self) -> impl Stream<Item = ((String, String), Arc<Value>)> {
stream! {
for (key, value) in self.0 {
info!("Conference {} ({}) is processing ...", key.0, key.1);
yield (key, Arc::new(value));
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
}
// to handle async http requests // to handle async http requests
#[derive(Debug)] #[derive(Debug)]
@ -35,6 +55,7 @@ impl Requester {
}) })
} }
pub async fn get_conferences(&self) -> anyhow::Result<Value> { pub async fn get_conferences(&self) -> anyhow::Result<Value> {
trace!("getting conferences list from API");
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT); let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
let req = self.client.get(url) let req = self.client.get(url)
.timeout(tokio::time::Duration::from_secs(10)) .timeout(tokio::time::Duration::from_secs(10))
@ -44,6 +65,7 @@ impl Requester {
Ok(from_str(&resp?.text().await?)?) Ok(from_str(&resp?.text().await?)?)
} }
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> { pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
trace!("extracting conferences list");
let mut hashset = Conferences::new(); let mut hashset = Conferences::new();
match conferences.get("data") { match conferences.get("data") {
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")), None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
@ -79,8 +101,9 @@ impl Requester {
} }
Ok(hashset) Ok(hashset)
} }
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<Jitter> { pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<OutputConferences> {
let mut output = Jitter::new(); trace!("getting users list for each conference from API ...");
let mut output = OutputConferences::new();
for conf in conferences { for conf in conferences {
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1); let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
let req = self.client.get(url) let req = self.client.get(url)
@ -95,70 +118,7 @@ impl Requester {
Err(er) => error!("Cannot convert response into text: {}", er), Err(er) => error!("Cannot convert response into text: {}", er),
Ok(resp) => { Ok(resp) => {
let resp: Value = from_str(&resp)?; let resp: Value = from_str(&resp)?;
if let Some(data) = resp.get("data") { output.entry(conf).or_insert(resp);
if let Some(parts) = data.get("participants") {
if let Some(parts) = parts.as_array() {
for participant in parts {
// save id
let id = {
if let Some(id) = participant.get("id") {
let id = serde_json::to_string(id)?.replace("\"", "");
if let Some(watermark) = participant.get("watermark") {
match watermark.as_str() {
Some(watermark) => (watermark.to_string(), id),
None => (String::new(), id),
}
} else {
error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf);
(String::new(), id)
}
} else {
error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf);
continue;
}
};
if let Some(params) = participant.get("params") {
let jitter = {
if let Some(jitter) = params.get("jBLen") {
// find substring
let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", "");
// string to u32
if let Ok(jitter) = temp.parse::<u32>() {
jitter
} else {
error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf);
continue;
}
} else {
error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
continue;
}
};
// hm push
output.entry(id)
.and_modify(|map| {
map.entry(conf.clone())
.and_modify(|val| *val = jitter)
.or_insert(jitter); } )
.or_insert( {
let mut hm = OutputConferences::new();
hm.insert(conf.clone(), jitter);
hm
} );
} else {
error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
}
}
}
else {
return Err(anyhow::Error::msg("Invalid JSON format: `participants` is not an array"))
}
} else {
return Err(anyhow::Error::msg("Invalid JSON format: no `participants` field"))
}
} else {
return Err(anyhow::Error::msg("Invalid JSON format: no `data` field"))
}
} }
} }
}, },
@ -166,28 +126,94 @@ impl Requester {
} }
Ok(output) Ok(output)
} }
async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended { }
use integr_structs::api::v3::MetricOutputExtended;
let mut metrics = PrometheusMetricsExtended::new_empty_jitter();
// for ((name, id), confs) in jitter { fn across_participants(conf: &Value) -> impl Stream<Item = Arc<Value>> + '_ {
// confs.iter() trace!("fetching participants for conf and going across ...");
// .for_each(|((conf_name, conf_id), &value)| {todo!()}); let participants_iter = conf
// } .get("data")
jitter.iter() .and_then(|data| data.get("participants"))
.map(|(participant, confs)| (confs.iter(), participant)) .and_then(|participants| participants.as_array())
.for_each(|(conf_iter, (name, id))| { .into_iter()
conf_iter.for_each(|((conf_name, conf_id), &jitter)| { .flatten()
let metric_id = format!("j{}{}", id, conf_id); .cloned();
let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id);
let val = serde_json::to_value(jitter).unwrap_or_else(|er| { stream! {
error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er); for participant in participants_iter {
Value::Null yield Arc::new(participant);
});
metrics.add(MetricOutputExtended::new_with_slices(&metric_id, &metric_id,"int", "Vinteo native", &desc, None, None,val));
});
});
metrics
} }
}
}
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<(&str, &str)>) -> Vec<Option<MetricOutputExtended>> {
trace!("extracting {} measures for current participant in current conference {} ...", targets.len(), conf_id);
let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new();
for (target, description) in targets {
futures.push(Box::pin(extract_from_participant(
parts.clone(),
target,
conf_id.clone(),
conf_desc.clone(),
description
)));
}
future::join_all(futures).await
}
// GET participants/{conference_id} data.total
async fn extract_total_participants(conferences : Arc<Value>) -> Value {
if let Some(total) = conferences
.get("data")
.and_then(|data| data.get("total")) {
return total.clone()
}
Value::Null
}
async fn extract_total_anon_participants(conferences : Arc<Value>) -> Value {
let sum = conferences
.get("data")
.and_then(|data| data.get("participants"))
.and_then(|parts| parts.as_array())
.iter()
.flat_map(|&val| val.iter())
.filter_map(|part| part.get("isAnonymous"))
.filter_map(|is_anon| is_anon.as_bool())
.filter(|is_anon| *is_anon == true)
.count();
Value::Number(sum.into())
}
// GET participants/{conference_id} data.participants[].isAnonymous
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>, description: &str) -> Option<MetricOutputExtended> {
trace!("extracting `{}` measure for current participant ...", target);
if let Some(value) = participant.get("params")
.and_then(|params| params.get(target)) {
let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
let id = participant.get("id").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
let metric_name = format!("{}_{}_{}", target, conf_id, id);
return Some(
MetricOutputExtended::new_with_slices(
&metric_name,
&metric_name,
"type",
"Vinteo native",
format!(
"{} для пользователя {} ({}) в конференции {} ({})",
description,
name,
id,
conf_desc,
conf_id
).as_ref(),
Some(18),
Some("module$12".to_string()),
value.clone()
)
)
}
None
} }
pub async fn init_grubbing_jitter() -> anyhow::Result<()> { pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
@ -200,12 +226,85 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
let conferences = Requester::get_conferences_from_value(conferences).await?; let conferences = Requester::get_conferences_from_value(conferences).await?;
info!("Getting jitter metrics by user in each conference ..."); info!("Getting jitter metrics by user in each conference ...");
let jitter = requester.get_users_jitter_by_conference(conferences).await?; let confs = requester.get_users_jitter_by_conference(conferences).await?;
info!("Casting jitter metrics into Metrics format for Prometheus ..."); info!("Initializing extraction mechanism ...");
let metrics = Requester::get_metrics_from_jitter(jitter).await; let conferences = ConferencesDigital::new(confs);
let extracted_conference = conferences.go_across();
let mut buffer: Vec<MetricOutputExtended> = Vec::new();
pin_mut!(extracted_conference);
if !metrics.metrics.is_empty() { while let Some(((desc, id), item)) = extracted_conference.next().await {
let parts_stream = across_participants(&item);
pin_mut!(parts_stream);
let total_participants = MetricOutputExtended::new_with_slices(
format!("TotalParticipants_{}", id).as_ref(),
format!("TotalParticipants_{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество участников в конференции {}", &desc).as_ref(),
Some(18),
Some(String::from("module$12")),
extract_total_participants(item.clone()).await
);
// anon NON_STABLE
let total_anon_participants = MetricOutputExtended::new_with_slices(
format!("TotalAnonymousParticipants_{}", id).as_ref(),
format!("TotalAnonymousParticipants_{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество анонимных участников в конференции {}", &desc).as_ref(),
Some(18),
Some(String::from("module$12")),
extract_total_anon_participants(item.clone()).await
);
// description
buffer.push(total_participants);
buffer.push(total_anon_participants);
while let Some(participant) = parts_stream.next().await {
buffer.append(
&mut gather_futures(
participant,
Arc::from(id.as_ref()),
Arc::from(desc.as_ref()),
vec![
("txBitrate", "Скорость отправки данных"),
("rxBitrate", "Скорость приёма данных"),
("txFPS", "Количество отправляемых кадров в секунду"),
("rxFPS", "Количество получаемых кадров в секунду"),
("rxLost", "Количество потерянных пакетов на приёме"),
("txLost", "Количество потерянных пакетов на отправке"),
("jBLen", "Фазовое отклонение на цифровом канале"),
("txLostPercent", "Процент потерянных пакетов на отправке"),
("rxLostPercent", "Процент потерянных пакетов на приёме"),
("txH239Bitrate", "Скорость передачи данных в дополнительном видео-потоке"),
("rxH239Bitrate", "Скорость приёма данных в дополнительном видео-потоке"),
("txVideoCodec", "Используемый видеокодек на отправке"),
("rxVideoCodec", "Используемый видеокодек на приёме"),
("txAudioCodec", "Используемый аудиокодек на отправке"),
("rxAudioCodec", "Используемый аудиокодек на приёме"),
("txResolution", "Разрешение передаваемого видео"),
("rxResolution", "Разрешение принимаемого видео"),
("txH239Resolution", "Разрешение дополнительного видео-потока"),
("rxH239Resolution", "Разрешение дополнительного видео-потока на принимающей стороне"),
("duration", "Длительсность сеанса (в минутах)"),
],
).await
.into_iter()
.filter(|metric| metric.is_some())
.map(|metric| metric.unwrap() )
.collect::<Vec<MetricOutputExtended>>()
);
}
}
// let metrics = Requester::get_metrics_from_jitter(jitter).await;
if !buffer.is_empty() {
let metrics = PrometheusMetricsExtended::new_zvks(buffer).await;
match Exporter::export_extended_metrics(metrics).await { match Exporter::export_extended_metrics(metrics).await {
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
Err(er) => error!("Cannot export data: {}", er), Err(er) => error!("Cannot export data: {}", er),
@ -213,8 +312,6 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
} else { } else {
warn!("Metrics array is empty. Ignoring exporting ..."); warn!("Metrics array is empty. Ignoring exporting ...");
} }
// Exporter::export_extended_metrics(metrics)
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
} }
// get users' jitter info // get users' jitter info