feature/178 #36
|
|
@ -28,3 +28,5 @@ openssl = { version = "0.10", features = ["vendored"] }
|
||||||
tracing-subscriber = "0.3.19"
|
tracing-subscriber = "0.3.19"
|
||||||
tracing = "0.1.41"
|
tracing = "0.1.41"
|
||||||
lazy_static = "1.5.0"
|
lazy_static = "1.5.0"
|
||||||
|
futures = "0.3.31"
|
||||||
|
async-stream = "0.3.6"
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,14 @@
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::{collections::{HashMap, HashSet}, future::Future};
|
||||||
use integr_structs::api::v3::PrometheusMetricsExtended;
|
use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended};
|
||||||
|
use std::sync::Arc;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, trace, warn};
|
||||||
|
use std::pin::Pin;
|
||||||
use serde_json::{Value, from_str};
|
use serde_json::{Value, from_str};
|
||||||
use crate::export::Exporter;
|
use crate::export::Exporter;
|
||||||
|
use async_stream::stream;
|
||||||
|
use futures::{future, pin_mut, stream::Stream, StreamExt};
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences");
|
static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences");
|
||||||
|
|
@ -13,10 +17,26 @@ lazy_static! {
|
||||||
|
|
||||||
// conferences ids
|
// conferences ids
|
||||||
type Conferences = HashSet<(String, String)>;
|
type Conferences = HashSet<(String, String)>;
|
||||||
// hash map {CONFERENCE_ID - JITTER_VALUE}
|
type OutputConferences = HashMap<(String, String), Value>;
|
||||||
type OutputConferences = HashMap<(String, String), u32>;
|
|
||||||
// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}}
|
#[derive(Debug, Clone)]
|
||||||
type Jitter = HashMap<(String, String), OutputConferences>;
|
struct ConferencesDigital(OutputConferences);
|
||||||
|
|
||||||
|
impl ConferencesDigital {
|
||||||
|
pub fn new(item: OutputConferences) -> Self {
|
||||||
|
Self(item)
|
||||||
|
}
|
||||||
|
pub fn go_across(self) -> impl Stream<Item = ((String, String), Arc<Value>)> {
|
||||||
|
stream! {
|
||||||
|
for (key, value) in self.0 {
|
||||||
|
info!("Conference {} ({}) is processing ...", key.0, key.1);
|
||||||
|
yield (key, Arc::new(value));
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// to handle async http requests
|
// to handle async http requests
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -35,6 +55,7 @@ impl Requester {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
pub async fn get_conferences(&self) -> anyhow::Result<Value> {
|
pub async fn get_conferences(&self) -> anyhow::Result<Value> {
|
||||||
|
trace!("getting conferences list from API");
|
||||||
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
|
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
|
||||||
let req = self.client.get(url)
|
let req = self.client.get(url)
|
||||||
.timeout(tokio::time::Duration::from_secs(10))
|
.timeout(tokio::time::Duration::from_secs(10))
|
||||||
|
|
@ -44,6 +65,7 @@ impl Requester {
|
||||||
Ok(from_str(&resp?.text().await?)?)
|
Ok(from_str(&resp?.text().await?)?)
|
||||||
}
|
}
|
||||||
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
|
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
|
||||||
|
trace!("extracting conferences list");
|
||||||
let mut hashset = Conferences::new();
|
let mut hashset = Conferences::new();
|
||||||
match conferences.get("data") {
|
match conferences.get("data") {
|
||||||
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
|
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
|
||||||
|
|
@ -79,8 +101,9 @@ impl Requester {
|
||||||
}
|
}
|
||||||
Ok(hashset)
|
Ok(hashset)
|
||||||
}
|
}
|
||||||
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<Jitter> {
|
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<OutputConferences> {
|
||||||
let mut output = Jitter::new();
|
trace!("getting users list for each conference from API ...");
|
||||||
|
let mut output = OutputConferences::new();
|
||||||
for conf in conferences {
|
for conf in conferences {
|
||||||
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
|
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
|
||||||
let req = self.client.get(url)
|
let req = self.client.get(url)
|
||||||
|
|
@ -95,70 +118,7 @@ impl Requester {
|
||||||
Err(er) => error!("Cannot convert response into text: {}", er),
|
Err(er) => error!("Cannot convert response into text: {}", er),
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
let resp: Value = from_str(&resp)?;
|
let resp: Value = from_str(&resp)?;
|
||||||
if let Some(data) = resp.get("data") {
|
output.entry(conf).or_insert(resp);
|
||||||
if let Some(parts) = data.get("participants") {
|
|
||||||
if let Some(parts) = parts.as_array() {
|
|
||||||
for participant in parts {
|
|
||||||
// save id
|
|
||||||
let id = {
|
|
||||||
if let Some(id) = participant.get("id") {
|
|
||||||
let id = serde_json::to_string(id)?.replace("\"", "");
|
|
||||||
if let Some(watermark) = participant.get("watermark") {
|
|
||||||
match watermark.as_str() {
|
|
||||||
Some(watermark) => (watermark.to_string(), id),
|
|
||||||
None => (String::new(), id),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf);
|
|
||||||
(String::new(), id)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if let Some(params) = participant.get("params") {
|
|
||||||
let jitter = {
|
|
||||||
if let Some(jitter) = params.get("jBLen") {
|
|
||||||
// find substring
|
|
||||||
let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", "");
|
|
||||||
// string to u32
|
|
||||||
if let Ok(jitter) = temp.parse::<u32>() {
|
|
||||||
jitter
|
|
||||||
} else {
|
|
||||||
error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// hm push
|
|
||||||
output.entry(id)
|
|
||||||
.and_modify(|map| {
|
|
||||||
map.entry(conf.clone())
|
|
||||||
.and_modify(|val| *val = jitter)
|
|
||||||
.or_insert(jitter); } )
|
|
||||||
.or_insert( {
|
|
||||||
let mut hm = OutputConferences::new();
|
|
||||||
hm.insert(conf.clone(), jitter);
|
|
||||||
hm
|
|
||||||
} );
|
|
||||||
} else {
|
|
||||||
error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
return Err(anyhow::Error::msg("Invalid JSON format: `participants` is not an array"))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(anyhow::Error::msg("Invalid JSON format: no `participants` field"))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(anyhow::Error::msg("Invalid JSON format: no `data` field"))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
@ -166,28 +126,94 @@ impl Requester {
|
||||||
}
|
}
|
||||||
Ok(output)
|
Ok(output)
|
||||||
}
|
}
|
||||||
async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended {
|
}
|
||||||
use integr_structs::api::v3::MetricOutputExtended;
|
|
||||||
let mut metrics = PrometheusMetricsExtended::new_empty_jitter();
|
|
||||||
// for ((name, id), confs) in jitter {
|
fn across_participants(conf: &Value) -> impl Stream<Item = Arc<Value>> + '_ {
|
||||||
// confs.iter()
|
trace!("fetching participants for conf and going across ...");
|
||||||
// .for_each(|((conf_name, conf_id), &value)| {todo!()});
|
let participants_iter = conf
|
||||||
// }
|
.get("data")
|
||||||
jitter.iter()
|
.and_then(|data| data.get("participants"))
|
||||||
.map(|(participant, confs)| (confs.iter(), participant))
|
.and_then(|participants| participants.as_array())
|
||||||
.for_each(|(conf_iter, (name, id))| {
|
.into_iter()
|
||||||
conf_iter.for_each(|((conf_name, conf_id), &jitter)| {
|
.flatten()
|
||||||
let metric_id = format!("j{}{}", id, conf_id);
|
.cloned();
|
||||||
let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id);
|
|
||||||
let val = serde_json::to_value(jitter).unwrap_or_else(|er| {
|
stream! {
|
||||||
error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er);
|
for participant in participants_iter {
|
||||||
Value::Null
|
yield Arc::new(participant);
|
||||||
});
|
|
||||||
metrics.add(MetricOutputExtended::new_with_slices(&metric_id, &metric_id,"int", "Vinteo native", &desc, None, None,val));
|
|
||||||
});
|
|
||||||
});
|
|
||||||
metrics
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<(&str, &str)>) -> Vec<Option<MetricOutputExtended>> {
|
||||||
|
trace!("extracting {} measures for current participant in current conference {} ...", targets.len(), conf_id);
|
||||||
|
let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new();
|
||||||
|
for (target, description) in targets {
|
||||||
|
futures.push(Box::pin(extract_from_participant(
|
||||||
|
parts.clone(),
|
||||||
|
target,
|
||||||
|
conf_id.clone(),
|
||||||
|
conf_desc.clone(),
|
||||||
|
description
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
future::join_all(futures).await
|
||||||
|
}
|
||||||
|
|
||||||
|
// GET participants/{conference_id} data.total
|
||||||
|
async fn extract_total_participants(conferences : Arc<Value>) -> Value {
|
||||||
|
if let Some(total) = conferences
|
||||||
|
.get("data")
|
||||||
|
.and_then(|data| data.get("total")) {
|
||||||
|
return total.clone()
|
||||||
|
}
|
||||||
|
Value::Null
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn extract_total_anon_participants(conferences : Arc<Value>) -> Value {
|
||||||
|
let sum = conferences
|
||||||
|
.get("data")
|
||||||
|
.and_then(|data| data.get("participants"))
|
||||||
|
.and_then(|parts| parts.as_array())
|
||||||
|
.iter()
|
||||||
|
.flat_map(|&val| val.iter())
|
||||||
|
.filter_map(|part| part.get("isAnonymous"))
|
||||||
|
.filter_map(|is_anon| is_anon.as_bool())
|
||||||
|
.filter(|is_anon| *is_anon == true)
|
||||||
|
.count();
|
||||||
|
Value::Number(sum.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
// GET participants/{conference_id} data.participants[].isAnonymous
|
||||||
|
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>, description: &str) -> Option<MetricOutputExtended> {
|
||||||
|
trace!("extracting `{}` measure for current participant ...", target);
|
||||||
|
if let Some(value) = participant.get("params")
|
||||||
|
.and_then(|params| params.get(target)) {
|
||||||
|
let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
|
||||||
|
let id = participant.get("id").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
|
||||||
|
let metric_name = format!("{}_{}_{}", target, conf_id, id);
|
||||||
|
return Some(
|
||||||
|
MetricOutputExtended::new_with_slices(
|
||||||
|
&metric_name,
|
||||||
|
&metric_name,
|
||||||
|
"type",
|
||||||
|
"Vinteo native",
|
||||||
|
format!(
|
||||||
|
"{} для пользователя {} ({}) в конференции {} ({})",
|
||||||
|
description,
|
||||||
|
name,
|
||||||
|
id,
|
||||||
|
conf_desc,
|
||||||
|
conf_id
|
||||||
|
).as_ref(),
|
||||||
|
Some(18),
|
||||||
|
Some("module$12".to_string()),
|
||||||
|
value.clone()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
|
pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
|
||||||
|
|
@ -200,12 +226,85 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
|
||||||
let conferences = Requester::get_conferences_from_value(conferences).await?;
|
let conferences = Requester::get_conferences_from_value(conferences).await?;
|
||||||
|
|
||||||
info!("Getting jitter metrics by user in each conference ...");
|
info!("Getting jitter metrics by user in each conference ...");
|
||||||
let jitter = requester.get_users_jitter_by_conference(conferences).await?;
|
let confs = requester.get_users_jitter_by_conference(conferences).await?;
|
||||||
|
|
||||||
info!("Casting jitter metrics into Metrics format for Prometheus ...");
|
info!("Initializing extraction mechanism ...");
|
||||||
let metrics = Requester::get_metrics_from_jitter(jitter).await;
|
let conferences = ConferencesDigital::new(confs);
|
||||||
|
let extracted_conference = conferences.go_across();
|
||||||
|
let mut buffer: Vec<MetricOutputExtended> = Vec::new();
|
||||||
|
pin_mut!(extracted_conference);
|
||||||
|
|
||||||
if !metrics.metrics.is_empty() {
|
while let Some(((desc, id), item)) = extracted_conference.next().await {
|
||||||
|
let parts_stream = across_participants(&item);
|
||||||
|
pin_mut!(parts_stream);
|
||||||
|
|
||||||
|
let total_participants = MetricOutputExtended::new_with_slices(
|
||||||
|
format!("TotalParticipants_{}", id).as_ref(),
|
||||||
|
format!("TotalParticipants_{}", id).as_ref(),
|
||||||
|
"type",
|
||||||
|
"Vinteo Native",
|
||||||
|
format!("Общее количество участников в конференции {}", &desc).as_ref(),
|
||||||
|
Some(18),
|
||||||
|
Some(String::from("module$12")),
|
||||||
|
extract_total_participants(item.clone()).await
|
||||||
|
);
|
||||||
|
// anon NON_STABLE
|
||||||
|
let total_anon_participants = MetricOutputExtended::new_with_slices(
|
||||||
|
format!("TotalAnonymousParticipants_{}", id).as_ref(),
|
||||||
|
format!("TotalAnonymousParticipants_{}", id).as_ref(),
|
||||||
|
"type",
|
||||||
|
"Vinteo Native",
|
||||||
|
format!("Общее количество анонимных участников в конференции {}", &desc).as_ref(),
|
||||||
|
Some(18),
|
||||||
|
Some(String::from("module$12")),
|
||||||
|
extract_total_anon_participants(item.clone()).await
|
||||||
|
);
|
||||||
|
// description
|
||||||
|
|
||||||
|
buffer.push(total_participants);
|
||||||
|
buffer.push(total_anon_participants);
|
||||||
|
|
||||||
|
while let Some(participant) = parts_stream.next().await {
|
||||||
|
buffer.append(
|
||||||
|
&mut gather_futures(
|
||||||
|
participant,
|
||||||
|
Arc::from(id.as_ref()),
|
||||||
|
Arc::from(desc.as_ref()),
|
||||||
|
vec![
|
||||||
|
("txBitrate", "Скорость отправки данных"),
|
||||||
|
("rxBitrate", "Скорость приёма данных"),
|
||||||
|
("txFPS", "Количество отправляемых кадров в секунду"),
|
||||||
|
("rxFPS", "Количество получаемых кадров в секунду"),
|
||||||
|
("rxLost", "Количество потерянных пакетов на приёме"),
|
||||||
|
("txLost", "Количество потерянных пакетов на отправке"),
|
||||||
|
("jBLen", "Фазовое отклонение на цифровом канале"),
|
||||||
|
("txLostPercent", "Процент потерянных пакетов на отправке"),
|
||||||
|
("rxLostPercent", "Процент потерянных пакетов на приёме"),
|
||||||
|
("txH239Bitrate", "Скорость передачи данных в дополнительном видео-потоке"),
|
||||||
|
("rxH239Bitrate", "Скорость приёма данных в дополнительном видео-потоке"),
|
||||||
|
("txVideoCodec", "Используемый видеокодек на отправке"),
|
||||||
|
("rxVideoCodec", "Используемый видеокодек на приёме"),
|
||||||
|
("txAudioCodec", "Используемый аудиокодек на отправке"),
|
||||||
|
("rxAudioCodec", "Используемый аудиокодек на приёме"),
|
||||||
|
("txResolution", "Разрешение передаваемого видео"),
|
||||||
|
("rxResolution", "Разрешение принимаемого видео"),
|
||||||
|
("txH239Resolution", "Разрешение дополнительного видео-потока"),
|
||||||
|
("rxH239Resolution", "Разрешение дополнительного видео-потока на принимающей стороне"),
|
||||||
|
("duration", "Длительсность сеанса (в минутах)"),
|
||||||
|
],
|
||||||
|
).await
|
||||||
|
.into_iter()
|
||||||
|
.filter(|metric| metric.is_some())
|
||||||
|
.map(|metric| metric.unwrap() )
|
||||||
|
.collect::<Vec<MetricOutputExtended>>()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// let metrics = Requester::get_metrics_from_jitter(jitter).await;
|
||||||
|
|
||||||
|
if !buffer.is_empty() {
|
||||||
|
let metrics = PrometheusMetricsExtended::new_zvks(buffer).await;
|
||||||
match Exporter::export_extended_metrics(metrics).await {
|
match Exporter::export_extended_metrics(metrics).await {
|
||||||
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
|
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
|
||||||
Err(er) => error!("Cannot export data: {}", er),
|
Err(er) => error!("Cannot export data: {}", er),
|
||||||
|
|
@ -213,8 +312,6 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
|
||||||
} else {
|
} else {
|
||||||
warn!("Metrics array is empty. Ignoring exporting ...");
|
warn!("Metrics array is empty. Ignoring exporting ...");
|
||||||
}
|
}
|
||||||
// Exporter::export_extended_metrics(metrics)
|
|
||||||
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||||
}
|
}
|
||||||
// get users' jitter info
|
// get users' jitter info
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue