Compare commits
2 Commits
f71f38c666
...
97369e6452
| Author | SHA1 | Date |
|---|---|---|
|
|
97369e6452 | |
|
|
0d449468de |
|
|
@ -26,4 +26,5 @@ rand = "0.9.0"
|
|||
sysinfo = "0.33.1"
|
||||
openssl = { version = "0.10", features = ["vendored"] }
|
||||
tracing-subscriber = "0.3.19"
|
||||
tracing = "0.1.41"
|
||||
tracing = "0.1.41"
|
||||
lazy_static = "1.5.0"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,168 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use reqwest::Client;
|
||||
use lazy_static::lazy_static;
|
||||
use tracing::{info, error};
|
||||
use serde_json::{Value, from_str};
|
||||
|
||||
|
||||
lazy_static! {
|
||||
static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences");
|
||||
static ref USERS_ENDPOINT: String = String::from("/api/v1/participants/");
|
||||
}
|
||||
|
||||
// conferences ids
|
||||
type Conferences = HashSet<String>;
|
||||
// hash map {CONFERENCE_ID - JITTER_VALUE}
|
||||
type OutputConferences = HashMap<String, u32>;
|
||||
// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}}
|
||||
type Output = HashMap<String, OutputConferences>;
|
||||
|
||||
// to handle async http requests
|
||||
#[derive(Debug)]
|
||||
struct Requester {
|
||||
base: String,
|
||||
api_key: String,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl Requester {
|
||||
pub fn new() -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
base: String::from("https://demo.vcs.vinteo.dev"),
|
||||
api_key: std::env::var("VINTEO_API_KEY").unwrap(),
|
||||
client: Client::builder().user_agent("api-grub").build()?,
|
||||
})
|
||||
}
|
||||
pub async fn get_conferences(&self) -> anyhow::Result<Value> {
|
||||
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
|
||||
let req = self.client.get(url)
|
||||
.header("accept", "application/json")
|
||||
.header("x-api-key", &self.api_key);
|
||||
let resp = req.send().await;
|
||||
Ok(from_str(&resp?.text().await?)?)
|
||||
}
|
||||
pub async fn get_conferences_from_value(mut conferences: Value) -> anyhow::Result<Conferences> {
|
||||
let mut hashset = Conferences::new();
|
||||
match conferences.get("data") {
|
||||
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
|
||||
Some(data) => {
|
||||
match data.get("conferences") {
|
||||
None => return Err(anyhow::Error::msg("Invalid JSON format: no `conferences` field")),
|
||||
Some(confs) => {
|
||||
if let Some(confs) = confs.as_array() {
|
||||
confs.iter()
|
||||
.for_each(|conf_obj| {
|
||||
if let Some(id) = conf_obj.get("number") {
|
||||
if let Some(id) = id.as_str() {
|
||||
hashset.insert(id.to_string());
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
return Err(anyhow::Error::msg("Invalid JSON format: `conferences` is not an array"))
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
Ok(hashset)
|
||||
}
|
||||
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<Output> {
|
||||
let mut output = Output::new();
|
||||
for conf in conferences {
|
||||
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf);
|
||||
let req = self.client.get(url)
|
||||
.header("accept", "application/json")
|
||||
.header("x-api-key", &self.api_key);
|
||||
|
||||
match req.send().await {
|
||||
Err(er) => error!("Cannot GET data about conference - {}: {}", conf, er),
|
||||
Ok(resp) => {
|
||||
match resp.text().await {
|
||||
Err(er) => error!("Cannot convert response into text: {}", er),
|
||||
Ok(resp) => {
|
||||
let resp: Value = from_str(&resp)?;
|
||||
if let Some(data) = resp.get("data") {
|
||||
if let Some(parts) = data.get("participants") {
|
||||
if let Some(parts) = parts.as_array() {
|
||||
for participant in parts {
|
||||
// save id
|
||||
let id = {
|
||||
if let Some(id) = participant.get("id") {
|
||||
serde_json::to_string(id)?.replace("\"", "")
|
||||
} else {
|
||||
error!("No `id` field was found in participant from {} conference. Skipping ...", &conf);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Some(params) = participant.get("params") {
|
||||
let jitter = {
|
||||
if let Some(jitter) = params.get("jBLen") {
|
||||
// find substring
|
||||
dbg!(&jitter);
|
||||
let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", "");
|
||||
// string to u32
|
||||
dbg!(&temp);
|
||||
if let Ok(jitter) = temp.parse::<u32>() {
|
||||
jitter
|
||||
} else {
|
||||
error!("Invalid type of `jBLen` field for participant {} in conference {}. Skipping ...", &id, &conf);
|
||||
continue;
|
||||
}
|
||||
|
||||
} else {
|
||||
error!("No `jBLen` field was found in participant {} from conference {}. Skipping ...", &id, &conf);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// hm push
|
||||
output.entry(id)
|
||||
.and_modify(|map| {
|
||||
map.entry(conf.clone())
|
||||
.and_modify(|val| *val = jitter)
|
||||
.or_insert(jitter); } )
|
||||
.or_insert( {
|
||||
let mut hm = OutputConferences::new();
|
||||
hm.insert(conf.clone(), jitter);
|
||||
hm
|
||||
} );
|
||||
} else {
|
||||
error!("No `params` field in participant {} from conference {}. Skipping ...", &id, &conf);
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
return Err(anyhow::Error::msg("Invalid JSON format: `participants` is not an array"))
|
||||
}
|
||||
} else {
|
||||
return Err(anyhow::Error::msg("Invalid JSON format: no `participants` field"))
|
||||
}
|
||||
} else {
|
||||
return Err(anyhow::Error::msg("Invalid JSON format: no `data` field"))
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
Ok(output)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
|
||||
// create requester
|
||||
let requester = Requester::new()?;
|
||||
// get conference id's
|
||||
loop {
|
||||
let conferences = requester.get_conferences().await?;
|
||||
let conferences = Requester::get_conferences_from_value(conferences).await?;
|
||||
// conferences.iter()
|
||||
// .for_each(|id| {
|
||||
// requester.get_users_jitter_by_conference(conference_id)
|
||||
// });
|
||||
let jitter = requester.get_users_jitter_by_conference(conferences).await?;
|
||||
dbg!(jitter);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
// get users' jitter info
|
||||
}
|
||||
|
|
@ -4,6 +4,7 @@ mod logger;
|
|||
mod json;
|
||||
mod export;
|
||||
mod monitoring;
|
||||
mod jitter;
|
||||
|
||||
use anyhow::Result;
|
||||
use integr_structs::api::v3::Config;
|
||||
|
|
@ -12,7 +13,10 @@ use config::{pull_local_config, init_config_grub_mechanism};
|
|||
use net::init_api_grub_mechanism;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{error, info, warn};
|
||||
// ENODE_MONITORING_IP
|
||||
use monitoring::get_metrics_from_monitoring;
|
||||
// VINTEO_API_KEY
|
||||
use jitter::init_grubbing_jitter;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() -> Result<()>{
|
||||
|
|
@ -33,23 +37,23 @@ async fn main() -> Result<()>{
|
|||
}
|
||||
});
|
||||
let event_grub = tokio::spawn(async move {
|
||||
// GRAB USING eNODE.MONITORING API GATEWAY
|
||||
if std::env::var("ENODE_MONITORING_IP").is_ok() {
|
||||
match get_metrics_from_monitoring(0, 5).await {
|
||||
Ok(_) => {
|
||||
info!("Grabing (eNODE.Monitoring) task deinitialized");
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Grabing task returned an error : {}", er);
|
||||
},
|
||||
Ok(_) => info!("Grabing (eNODE.Monitoring) task de-initialized"),
|
||||
Err(er) => error!("Grabing task returned an error : {}", er),
|
||||
}
|
||||
// JITTER NATIVE GRAB TASK
|
||||
} else if std::env::var("VINTEO_API_KEY").is_ok() {
|
||||
match init_grubbing_jitter().await {
|
||||
Ok(_) => info!("Grabing (Vinteo - Jitter native) task de-initialized"),
|
||||
Err(er) => error!("Jitter grabing mechanism crushed : {}", er),
|
||||
}
|
||||
// NATIVE GRAB TASK USING `config_api.json`
|
||||
} else {
|
||||
match init_api_grub_mechanism(config, &mut rx).await {
|
||||
Ok(_) => {
|
||||
info!("Grabing task deinitialized");
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Grabing task returned an error : {}", er);
|
||||
},
|
||||
Ok(_) => info!("Grabing task de-initialized"),
|
||||
Err(er) => error!("Grabing task returned an error : {}", er),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
|||
Loading…
Reference in New Issue