diff --git a/crates/api-grub/src/jitter.rs b/crates/api-grub/src/jitter.rs index 1ab47c7..19899c4 100644 --- a/crates/api-grub/src/jitter.rs +++ b/crates/api-grub/src/jitter.rs @@ -17,12 +17,7 @@ lazy_static! { // conferences ids type Conferences = HashSet<(String, String)>; -// all participants per each conference -// type AllConferencesInfo = Vec; -// hash map {CONFERENCE_ID - CONFERENCE_VALUE} type OutputConferences = HashMap<(String, String), Value>; -// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}} -// type Jitter = HashMap<(String, String), OutputConferences>; #[derive(Debug, Clone)] struct ConferencesDigital(OutputConferences); @@ -40,10 +35,6 @@ impl ConferencesDigital { } } } - pub fn clone_confs(&self) -> OutputConferences { - self.0.clone() - } - } @@ -151,14 +142,15 @@ impl Requester { } } - async fn gather_futures(parts: Arc, conf_id: Arc, conf_desc: Arc, targets: Vec<&str>) -> Vec> { + async fn gather_futures(parts: Arc, conf_id: Arc, conf_desc: Arc, targets: Vec<(&str, &str)>) -> Vec> { let mut futures: Vec> + Send>>> = Vec::new(); - for target in targets { + for (target, description) in targets { futures.push(Box::pin(extract_from_participant( parts.clone(), target, conf_id.clone(), - conf_desc.clone() + conf_desc.clone(), + description ))); } future::join_all(futures).await @@ -173,21 +165,39 @@ async fn extract_total_participants(conferences : Arc) -> Value { } Value::Null } + +async fn extract_total_anon_participants(conferences : Arc) -> Value { + let sum = conferences + .get("data") + .and_then(|data| data.get("participants")) + .and_then(|parts| parts.as_array()) + .iter() + .flat_map(|&val| val.iter()) + .filter_map(|part| part.get("isAnonymous")) + .filter_map(|is_anon| is_anon.as_bool()) + .filter(|is_anon| *is_anon == true) + .count(); + Value::Number(sum.into()) +} + // GET participants/{conference_id} data.participants[].isAnonymous -async fn extract_from_participant(participant : Arc, target: &str, conf_id: Arc, conf_desc: Arc) -> Option { +async fn extract_from_participant(participant : Arc, target: &str, conf_id: Arc, conf_desc: Arc, description: &str) -> Option { if let Some(value) = participant.get("params") .and_then(|params| params.get(target)) { + let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown"); + let id = participant.get("id").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown"); + let metric_name = format!("{}_{}_{}", target, conf_id, id); return Some( MetricOutputExtended::new_with_slices( - target, - target, + &metric_name, + &metric_name, "type", "Vinteo native", format!( - "Значение параметра `{}` для пользователя {} ({}) в конференции {} ({})", - target, - participant.get("watermark").unwrap_or_else(|| &Value::Null), - participant.get("id").unwrap_or_else(|| &Value::Null), + "{} для пользователя {} ({}) в конференции {} ({})", + description, + name, + id, conf_desc, conf_id ).as_ref(), @@ -214,18 +224,17 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> { info!("Initializing extraction mechanism ..."); let conferences = ConferencesDigital::new(confs); - let full_iterable = Arc::new(conferences.clone_confs()); let extracted_conference = conferences.go_across(); - let mut buffer: Vec = Vec::new(); + let mut buffer: Vec = Vec::new(); pin_mut!(extracted_conference); - while let Some(((id, desc), item)) = extracted_conference.next().await { + while let Some(((desc, id), item)) = extracted_conference.next().await { let parts_stream = across_participants(&item); pin_mut!(parts_stream); let total_participants = MetricOutputExtended::new_with_slices( - format!("TotalParticipantsIn{}", id).as_ref(), - format!("TotalParticipantsIn{}", id).as_ref(), + format!("TotalParticipants_{}", id).as_ref(), + format!("TotalParticipants_{}", id).as_ref(), "type", "Vinteo Native", format!("Общее количество участников в конференции {}", &desc).as_ref(), @@ -233,53 +242,70 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> { Some(String::from("module$12")), extract_total_participants(item.clone()).await ); + // anon NON_STABLE + let total_anon_participants = MetricOutputExtended::new_with_slices( + format!("TotalAnonymousParticipants_{}", id).as_ref(), + format!("TotalAnonymousParticipants_{}", id).as_ref(), + "type", + "Vinteo Native", + format!("Общее количество анонимных участников в конференции {}", &desc).as_ref(), + Some(18), + Some(String::from("module$12")), + extract_total_anon_participants(item.clone()).await + ); + // description - // dbg!(total_participants); + buffer.push(total_participants); + buffer.push(total_anon_participants); while let Some(participant) = parts_stream.next().await { - let metrics = gather_futures( + buffer.append( + &mut gather_futures( participant, Arc::from(id.as_ref()), Arc::from(desc.as_ref()), vec![ - "txBitrate", - "rxBitrate", - "txFPS", - "rxFPS", - "rxLost", - "txLost", - "jBLen", - "txLostPercent", - "rxLostPercent", - "txH239Bitrate", - "rxH239Bitrate", - "txVideoCodec", - "rxVideoCodec", - "txAudioCodec", - "rxAudioCodec", - "txResolution", - "rxResolution", - "txH239Resolution", - "rxH239Resolution", - "duration", + ("txBitrate", "Скорость отправки данных"), + ("rxBitrate", "Скорость приёма данных"), + ("txFPS", "Количество отправляемых кадров в секунду"), + ("rxFPS", "Количество получаемых кадров в секунду"), + ("rxLost", "Количество потерянных пакетов на приёме"), + ("txLost", "Количество потерянных пакетов на отправке"), + ("jBLen", "Фазовое отклонение на цифровом канале"), + ("txLostPercent", "Процент потерянных пакетов на отправке"), + ("rxLostPercent", "Процент потерянных пакетов на приёме"), + ("txH239Bitrate", "Скорость передачи данных в дополнительном видео-потоке"), + ("rxH239Bitrate", "Скорость приёма данных в дополнительном видео-потоке"), + ("txVideoCodec", "Используемый видеокодек на отправке"), + ("rxVideoCodec", "Используемый видеокодек на приёме"), + ("txAudioCodec", "Используемый аудиокодек на отправке"), + ("rxAudioCodec", "Используемый аудиокодек на приёме"), + ("txResolution", "Разрешение передаваемого видео"), + ("rxResolution", "Разрешение принимаемого видео"), + ("txH239Resolution", "Разрешение дополнительного видео-потока"), + ("rxH239Resolution", "Разрешение дополнительного видео-потока на принимающей стороне"), + ("duration", "Длительсность сеанса (в минутах)"), ], - ).await; - dbg!(metrics); + ).await + .into_iter() + .filter(|metric| metric.is_some()) + .map(|metric| metric.unwrap() ) + .collect::>() + ); } } // let metrics = Requester::get_metrics_from_jitter(jitter).await; - // if !metrics.metrics.is_empty() { - // match Exporter::export_extended_metrics(metrics).await { - // Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), - // Err(er) => error!("Cannot export data: {}", er), - // } - // } else { - // warn!("Metrics array is empty. Ignoring exporting ..."); - // } - // Exporter::export_extended_metrics(metrics) - + if !buffer.is_empty() { + let metrics = PrometheusMetricsExtended::new_zvks(buffer).await; + match Exporter::export_extended_metrics(metrics).await { + Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes), + Err(er) => error!("Cannot export data: {}", er), + } + } else { + warn!("Metrics array is empty. Ignoring exporting ..."); + } tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } // get users' jitter info