adding tracing
test-org/integration-module/pipeline/pr-rc This commit looks good Details

pull/36/head
prplV 2025-05-27 02:43:21 -04:00
parent 7cc7c0799a
commit 66b66b966b
1 changed files with 7 additions and 1 deletions

View File

@ -3,7 +3,7 @@ use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended};
use std::sync::Arc; use std::sync::Arc;
use reqwest::Client; use reqwest::Client;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tracing::{error, info, warn}; use tracing::{error, info, trace, warn};
use std::pin::Pin; use std::pin::Pin;
use serde_json::{Value, from_str}; use serde_json::{Value, from_str};
use crate::export::Exporter; use crate::export::Exporter;
@ -55,6 +55,7 @@ impl Requester {
}) })
} }
pub async fn get_conferences(&self) -> anyhow::Result<Value> { pub async fn get_conferences(&self) -> anyhow::Result<Value> {
trace!("getting conferences list from API");
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT); let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
let req = self.client.get(url) let req = self.client.get(url)
.timeout(tokio::time::Duration::from_secs(10)) .timeout(tokio::time::Duration::from_secs(10))
@ -64,6 +65,7 @@ impl Requester {
Ok(from_str(&resp?.text().await?)?) Ok(from_str(&resp?.text().await?)?)
} }
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> { pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
trace!("extracting conferences list");
let mut hashset = Conferences::new(); let mut hashset = Conferences::new();
match conferences.get("data") { match conferences.get("data") {
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")), None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
@ -100,6 +102,7 @@ impl Requester {
Ok(hashset) Ok(hashset)
} }
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<OutputConferences> { pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<OutputConferences> {
trace!("getting users list for each conference from API ...");
let mut output = OutputConferences::new(); let mut output = OutputConferences::new();
for conf in conferences { for conf in conferences {
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1); let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
@ -127,6 +130,7 @@ impl Requester {
fn across_participants(conf: &Value) -> impl Stream<Item = Arc<Value>> + '_ { fn across_participants(conf: &Value) -> impl Stream<Item = Arc<Value>> + '_ {
trace!("fetching participants for conf and going across ...");
let participants_iter = conf let participants_iter = conf
.get("data") .get("data")
.and_then(|data| data.get("participants")) .and_then(|data| data.get("participants"))
@ -143,6 +147,7 @@ impl Requester {
} }
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<(&str, &str)>) -> Vec<Option<MetricOutputExtended>> { async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<(&str, &str)>) -> Vec<Option<MetricOutputExtended>> {
trace!("extracting {} measures for current participant in current conference {} ...", targets.len(), conf_id);
let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new(); let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new();
for (target, description) in targets { for (target, description) in targets {
futures.push(Box::pin(extract_from_participant( futures.push(Box::pin(extract_from_participant(
@ -182,6 +187,7 @@ async fn extract_total_anon_participants(conferences : Arc<Value>) -> Value {
// GET participants/{conference_id} data.participants[].isAnonymous // GET participants/{conference_id} data.participants[].isAnonymous
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>, description: &str) -> Option<MetricOutputExtended> { async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>, description: &str) -> Option<MetricOutputExtended> {
trace!("extracting `{}` measure for current participant ...", target);
if let Some(value) = participant.get("params") if let Some(value) = participant.get("params")
.and_then(|params| params.get(target)) { .and_then(|params| params.get(target)) {
let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown"); let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");