fix and finish

pull/36/head
prplV 2025-05-26 13:28:15 -04:00
parent af604c55a6
commit 7cc7c0799a
1 changed files with 84 additions and 58 deletions

View File

@ -17,12 +17,7 @@ lazy_static! {
// conferences ids
type Conferences = HashSet<(String, String)>;
// all participants per each conference
// type AllConferencesInfo = Vec<Value>;
// hash map {CONFERENCE_ID - CONFERENCE_VALUE}
type OutputConferences = HashMap<(String, String), Value>;
// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}}
// type Jitter = HashMap<(String, String), OutputConferences>;
#[derive(Debug, Clone)]
struct ConferencesDigital(OutputConferences);
@ -40,10 +35,6 @@ impl ConferencesDigital {
}
}
}
pub fn clone_confs(&self) -> OutputConferences {
self.0.clone()
}
}
@ -151,14 +142,15 @@ impl Requester {
}
}
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<&str>) -> Vec<Option<MetricOutputExtended>> {
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<(&str, &str)>) -> Vec<Option<MetricOutputExtended>> {
let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new();
for target in targets {
for (target, description) in targets {
futures.push(Box::pin(extract_from_participant(
parts.clone(),
target,
conf_id.clone(),
conf_desc.clone()
conf_desc.clone(),
description
)));
}
future::join_all(futures).await
@ -173,21 +165,39 @@ async fn extract_total_participants(conferences : Arc<Value>) -> Value {
}
Value::Null
}
async fn extract_total_anon_participants(conferences : Arc<Value>) -> Value {
let sum = conferences
.get("data")
.and_then(|data| data.get("participants"))
.and_then(|parts| parts.as_array())
.iter()
.flat_map(|&val| val.iter())
.filter_map(|part| part.get("isAnonymous"))
.filter_map(|is_anon| is_anon.as_bool())
.filter(|is_anon| *is_anon == true)
.count();
Value::Number(sum.into())
}
// GET participants/{conference_id} data.participants[].isAnonymous
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>) -> Option<MetricOutputExtended> {
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>, description: &str) -> Option<MetricOutputExtended> {
if let Some(value) = participant.get("params")
.and_then(|params| params.get(target)) {
let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
let id = participant.get("id").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
let metric_name = format!("{}_{}_{}", target, conf_id, id);
return Some(
MetricOutputExtended::new_with_slices(
target,
target,
&metric_name,
&metric_name,
"type",
"Vinteo native",
format!(
"Значение параметра `{}` для пользователя {} ({}) в конференции {} ({})",
target,
participant.get("watermark").unwrap_or_else(|| &Value::Null),
participant.get("id").unwrap_or_else(|| &Value::Null),
"{} для пользователя {} ({}) в конференции {} ({})",
description,
name,
id,
conf_desc,
conf_id
).as_ref(),
@ -214,18 +224,17 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
info!("Initializing extraction mechanism ...");
let conferences = ConferencesDigital::new(confs);
let full_iterable = Arc::new(conferences.clone_confs());
let extracted_conference = conferences.go_across();
let mut buffer: Vec<PrometheusMetricsExtended> = Vec::new();
let mut buffer: Vec<MetricOutputExtended> = Vec::new();
pin_mut!(extracted_conference);
while let Some(((id, desc), item)) = extracted_conference.next().await {
while let Some(((desc, id), item)) = extracted_conference.next().await {
let parts_stream = across_participants(&item);
pin_mut!(parts_stream);
let total_participants = MetricOutputExtended::new_with_slices(
format!("TotalParticipantsIn{}", id).as_ref(),
format!("TotalParticipantsIn{}", id).as_ref(),
format!("TotalParticipants_{}", id).as_ref(),
format!("TotalParticipants_{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество участников в конференции {}", &desc).as_ref(),
@ -233,53 +242,70 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
Some(String::from("module$12")),
extract_total_participants(item.clone()).await
);
// anon NON_STABLE
let total_anon_participants = MetricOutputExtended::new_with_slices(
format!("TotalAnonymousParticipants_{}", id).as_ref(),
format!("TotalAnonymousParticipants_{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество анонимных участников в конференции {}", &desc).as_ref(),
Some(18),
Some(String::from("module$12")),
extract_total_anon_participants(item.clone()).await
);
// description
// dbg!(total_participants);
buffer.push(total_participants);
buffer.push(total_anon_participants);
while let Some(participant) = parts_stream.next().await {
let metrics = gather_futures(
buffer.append(
&mut gather_futures(
participant,
Arc::from(id.as_ref()),
Arc::from(desc.as_ref()),
vec![
"txBitrate",
"rxBitrate",
"txFPS",
"rxFPS",
"rxLost",
"txLost",
"jBLen",
"txLostPercent",
"rxLostPercent",
"txH239Bitrate",
"rxH239Bitrate",
"txVideoCodec",
"rxVideoCodec",
"txAudioCodec",
"rxAudioCodec",
"txResolution",
"rxResolution",
"txH239Resolution",
"rxH239Resolution",
"duration",
("txBitrate", "Скорость отправки данных"),
("rxBitrate", "Скорость приёма данных"),
("txFPS", "Количество отправляемых кадров в секунду"),
("rxFPS", "Количество получаемых кадров в секунду"),
("rxLost", "Количество потерянных пакетов на приёме"),
("txLost", "Количество потерянных пакетов на отправке"),
("jBLen", "Фазовое отклонение на цифровом канале"),
("txLostPercent", "Процент потерянных пакетов на отправке"),
("rxLostPercent", "Процент потерянных пакетов на приёме"),
("txH239Bitrate", "Скорость передачи данных в дополнительном видео-потоке"),
("rxH239Bitrate", "Скорость приёма данных в дополнительном видео-потоке"),
("txVideoCodec", "Используемый видеокодек на отправке"),
("rxVideoCodec", "Используемый видеокодек на приёме"),
("txAudioCodec", "Используемый аудиокодек на отправке"),
("rxAudioCodec", "Используемый аудиокодек на приёме"),
("txResolution", "Разрешение передаваемого видео"),
("rxResolution", "Разрешение принимаемого видео"),
("txH239Resolution", "Разрешение дополнительного видео-потока"),
("rxH239Resolution", "Разрешение дополнительного видео-потока на принимающей стороне"),
("duration", "Длительсность сеанса (в минутах)"),
],
).await;
dbg!(metrics);
).await
.into_iter()
.filter(|metric| metric.is_some())
.map(|metric| metric.unwrap() )
.collect::<Vec<MetricOutputExtended>>()
);
}
}
// let metrics = Requester::get_metrics_from_jitter(jitter).await;
// if !metrics.metrics.is_empty() {
// match Exporter::export_extended_metrics(metrics).await {
// Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
// Err(er) => error!("Cannot export data: {}", er),
// }
// } else {
// warn!("Metrics array is empty. Ignoring exporting ...");
// }
// Exporter::export_extended_metrics(metrics)
if !buffer.is_empty() {
let metrics = PrometheusMetricsExtended::new_zvks(buffer).await;
match Exporter::export_extended_metrics(metrics).await {
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
Err(er) => error!("Cannot export data: {}", er),
}
} else {
warn!("Metrics array is empty. Ignoring exporting ...");
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
// get users' jitter info