Merge pull request 'feature/135' (#27) from feature/135 into rc
test-org/integration-module/pipeline/pr-master Build succeeded
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/27 Reviewed-by: DmitriyA <faleo1999@mail.ru>pull/28/head
commit
1d8c0d5fc2
|
|
@ -9,6 +9,9 @@ DB_DBNAME = "db_name"1
|
||||||
# Prometheus-Exporter info
|
# Prometheus-Exporter info
|
||||||
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
|
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
|
||||||
|
|
||||||
|
# VINTEO Jitter puller (needed to init Jitter native grab)
|
||||||
|
VINTEO_API_KEY = "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"
|
||||||
|
|
||||||
# Status Model API support
|
# Status Model API support
|
||||||
# > if exists, ignore `EXPORTER_URL` var
|
# > if exists, ignore `EXPORTER_URL` var
|
||||||
STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input"
|
STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input"
|
||||||
|
|
|
||||||
|
|
@ -26,4 +26,5 @@ rand = "0.9.0"
|
||||||
sysinfo = "0.33.1"
|
sysinfo = "0.33.1"
|
||||||
openssl = { version = "0.10", features = ["vendored"] }
|
openssl = { version = "0.10", features = ["vendored"] }
|
||||||
tracing-subscriber = "0.3.19"
|
tracing-subscriber = "0.3.19"
|
||||||
tracing = "0.1.41"
|
tracing = "0.1.41"
|
||||||
|
lazy_static = "1.5.0"
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,219 @@
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use integr_structs::api::v3::PrometheusMetricsExtended;
|
||||||
|
use reqwest::Client;
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use tracing::{error, info, warn};
|
||||||
|
use serde_json::{Value, from_str};
|
||||||
|
use crate::export::Exporter;
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences");
|
||||||
|
static ref USERS_ENDPOINT: String = String::from("/api/v1/participants/");
|
||||||
|
}
|
||||||
|
|
||||||
|
// conferences ids
|
||||||
|
type Conferences = HashSet<(String, String)>;
|
||||||
|
// hash map {CONFERENCE_ID - JITTER_VALUE}
|
||||||
|
type OutputConferences = HashMap<(String, String), u32>;
|
||||||
|
// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}}
|
||||||
|
type Jitter = HashMap<(String, String), OutputConferences>;
|
||||||
|
|
||||||
|
// to handle async http requests
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Requester {
|
||||||
|
base: String,
|
||||||
|
api_key: String,
|
||||||
|
client: Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Requester {
|
||||||
|
pub fn new() -> anyhow::Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
base: String::from("https://demo.vcs.vinteo.dev"),
|
||||||
|
api_key: std::env::var("VINTEO_API_KEY").unwrap(),
|
||||||
|
client: Client::builder().user_agent("api-grub").build()?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
pub async fn get_conferences(&self) -> anyhow::Result<Value> {
|
||||||
|
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
|
||||||
|
let req = self.client.get(url)
|
||||||
|
.header("accept", "application/json")
|
||||||
|
.header("x-api-key", &self.api_key);
|
||||||
|
let resp = req.send().await;
|
||||||
|
Ok(from_str(&resp?.text().await?)?)
|
||||||
|
}
|
||||||
|
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
|
||||||
|
let mut hashset = Conferences::new();
|
||||||
|
match conferences.get("data") {
|
||||||
|
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
|
||||||
|
Some(data) => {
|
||||||
|
match data.get("conferences") {
|
||||||
|
None => return Err(anyhow::Error::msg("Invalid JSON format: no `conferences` field")),
|
||||||
|
Some(confs) => {
|
||||||
|
if let Some(confs) = confs.as_array() {
|
||||||
|
confs.iter()
|
||||||
|
.for_each(|conf_obj| {
|
||||||
|
if let Some(id) = conf_obj.get("number") {
|
||||||
|
if let Some(id) = id.as_str() {
|
||||||
|
let id = {
|
||||||
|
if let Some(desc) = conf_obj.get("description") {
|
||||||
|
match desc.as_str() {
|
||||||
|
Some(desc) => (desc.to_string(), id.to_string()),
|
||||||
|
None => (String::new(), id.to_string())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
(String::new(), id.to_string())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
hashset.insert(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
return Err(anyhow::Error::msg("Invalid JSON format: `conferences` is not an array"))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Ok(hashset)
|
||||||
|
}
|
||||||
|
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<Jitter> {
|
||||||
|
let mut output = Jitter::new();
|
||||||
|
for conf in conferences {
|
||||||
|
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
|
||||||
|
let req = self.client.get(url)
|
||||||
|
.header("accept", "application/json")
|
||||||
|
.header("x-api-key", &self.api_key);
|
||||||
|
|
||||||
|
match req.send().await {
|
||||||
|
Err(er) => error!("Cannot GET data about conference - {:?}: {}", conf, er),
|
||||||
|
Ok(resp) => {
|
||||||
|
match resp.text().await {
|
||||||
|
Err(er) => error!("Cannot convert response into text: {}", er),
|
||||||
|
Ok(resp) => {
|
||||||
|
let resp: Value = from_str(&resp)?;
|
||||||
|
if let Some(data) = resp.get("data") {
|
||||||
|
if let Some(parts) = data.get("participants") {
|
||||||
|
if let Some(parts) = parts.as_array() {
|
||||||
|
for participant in parts {
|
||||||
|
// save id
|
||||||
|
let id = {
|
||||||
|
if let Some(id) = participant.get("id") {
|
||||||
|
let id = serde_json::to_string(id)?.replace("\"", "");
|
||||||
|
if let Some(watermark) = participant.get("watermark") {
|
||||||
|
match watermark.as_str() {
|
||||||
|
Some(watermark) => (watermark.to_string(), id),
|
||||||
|
None => (String::new(), id),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf);
|
||||||
|
(String::new(), id)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Some(params) = participant.get("params") {
|
||||||
|
let jitter = {
|
||||||
|
if let Some(jitter) = params.get("jBLen") {
|
||||||
|
// find substring
|
||||||
|
let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", "");
|
||||||
|
// string to u32
|
||||||
|
if let Ok(jitter) = temp.parse::<u32>() {
|
||||||
|
jitter
|
||||||
|
} else {
|
||||||
|
error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// hm push
|
||||||
|
output.entry(id)
|
||||||
|
.and_modify(|map| {
|
||||||
|
map.entry(conf.clone())
|
||||||
|
.and_modify(|val| *val = jitter)
|
||||||
|
.or_insert(jitter); } )
|
||||||
|
.or_insert( {
|
||||||
|
let mut hm = OutputConferences::new();
|
||||||
|
hm.insert(conf.clone(), jitter);
|
||||||
|
hm
|
||||||
|
} );
|
||||||
|
} else {
|
||||||
|
error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return Err(anyhow::Error::msg("Invalid JSON format: `participants` is not an array"))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(anyhow::Error::msg("Invalid JSON format: no `participants` field"))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(anyhow::Error::msg("Invalid JSON format: no `data` field"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(output)
|
||||||
|
}
|
||||||
|
async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended {
|
||||||
|
use integr_structs::api::v3::MetricOutputExtended;
|
||||||
|
let mut metrics = PrometheusMetricsExtended::new_empty_jitter();
|
||||||
|
// for ((name, id), confs) in jitter {
|
||||||
|
// confs.iter()
|
||||||
|
// .for_each(|((conf_name, conf_id), &value)| {todo!()});
|
||||||
|
// }
|
||||||
|
jitter.iter()
|
||||||
|
.map(|(participant, confs)| (confs.iter(), participant))
|
||||||
|
.for_each(|(conf_iter, (name, id))| {
|
||||||
|
conf_iter.for_each(|((conf_name, conf_id), &jitter)| {
|
||||||
|
let metric_id = format!("j{}{}", id, conf_id);
|
||||||
|
let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id);
|
||||||
|
let val = serde_json::to_value(jitter).unwrap_or_else(|er| {
|
||||||
|
error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er);
|
||||||
|
Value::Null
|
||||||
|
});
|
||||||
|
metrics.add(MetricOutputExtended::new_with_slices(&metric_id, "int", "Vinteo native", &desc, val));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
metrics
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
|
||||||
|
// create requester
|
||||||
|
let requester = Requester::new()?;
|
||||||
|
// get conference id's
|
||||||
|
loop {
|
||||||
|
info!("Getting conferences array ...");
|
||||||
|
let conferences = requester.get_conferences().await?;
|
||||||
|
let conferences = Requester::get_conferences_from_value(conferences).await?;
|
||||||
|
|
||||||
|
info!("Getting jitter metrics by user in each conference ...");
|
||||||
|
let jitter = requester.get_users_jitter_by_conference(conferences).await?;
|
||||||
|
|
||||||
|
info!("Casting jitter metrics into Metrics format for Prometheus ...");
|
||||||
|
let metrics = Requester::get_metrics_from_jitter(jitter).await;
|
||||||
|
|
||||||
|
if !metrics.metrics.is_empty() {
|
||||||
|
match Exporter::export_extended_metrics(metrics).await {
|
||||||
|
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
|
||||||
|
Err(er) => error!("Cannot export data: {}", er),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("Metrics array is empty. Ignoring exporting ...");
|
||||||
|
}
|
||||||
|
// Exporter::export_extended_metrics(metrics)
|
||||||
|
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||||
|
}
|
||||||
|
// get users' jitter info
|
||||||
|
}
|
||||||
|
|
@ -4,6 +4,7 @@ mod logger;
|
||||||
mod json;
|
mod json;
|
||||||
mod export;
|
mod export;
|
||||||
mod monitoring;
|
mod monitoring;
|
||||||
|
mod jitter;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use integr_structs::api::v3::Config;
|
use integr_structs::api::v3::Config;
|
||||||
|
|
@ -12,7 +13,10 @@ use config::{pull_local_config, init_config_grub_mechanism};
|
||||||
use net::init_api_grub_mechanism;
|
use net::init_api_grub_mechanism;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
|
// ENODE_MONITORING_IP
|
||||||
use monitoring::get_metrics_from_monitoring;
|
use monitoring::get_metrics_from_monitoring;
|
||||||
|
// VINTEO_API_KEY
|
||||||
|
use jitter::init_grubbing_jitter;
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
async fn main() -> Result<()>{
|
async fn main() -> Result<()>{
|
||||||
|
|
@ -33,23 +37,23 @@ async fn main() -> Result<()>{
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
let event_grub = tokio::spawn(async move {
|
let event_grub = tokio::spawn(async move {
|
||||||
|
// GRAB USING eNODE.MONITORING API GATEWAY
|
||||||
if std::env::var("ENODE_MONITORING_IP").is_ok() {
|
if std::env::var("ENODE_MONITORING_IP").is_ok() {
|
||||||
match get_metrics_from_monitoring(0, 5).await {
|
match get_metrics_from_monitoring(0, 5).await {
|
||||||
Ok(_) => {
|
Ok(_) => info!("Grabing (eNODE.Monitoring) task de-initialized"),
|
||||||
info!("Grabing (eNODE.Monitoring) task deinitialized");
|
Err(er) => error!("Grabing task returned an error : {}", er),
|
||||||
},
|
|
||||||
Err(er) => {
|
|
||||||
error!("Grabing task returned an error : {}", er);
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
// JITTER NATIVE GRAB TASK
|
||||||
|
} else if std::env::var("VINTEO_API_KEY").is_ok() {
|
||||||
|
match init_grubbing_jitter().await {
|
||||||
|
Ok(_) => info!("Grabing (Vinteo - Jitter native) task de-initialized"),
|
||||||
|
Err(er) => error!("Jitter grabing mechanism crushed : {}", er),
|
||||||
|
}
|
||||||
|
// NATIVE GRAB TASK USING `config_api.json`
|
||||||
} else {
|
} else {
|
||||||
match init_api_grub_mechanism(config, &mut rx).await {
|
match init_api_grub_mechanism(config, &mut rx).await {
|
||||||
Ok(_) => {
|
Ok(_) => info!("Grabing task de-initialized"),
|
||||||
info!("Grabing task deinitialized");
|
Err(er) => error!("Grabing task returned an error : {}", er),
|
||||||
},
|
|
||||||
Err(er) => {
|
|
||||||
error!("Grabing task returned an error : {}", er);
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -321,6 +321,16 @@ pub mod v3 {
|
||||||
pub metrics: Vec<MetricOutputExtended>,
|
pub metrics: Vec<MetricOutputExtended>,
|
||||||
}
|
}
|
||||||
impl PrometheusMetricsExtended {
|
impl PrometheusMetricsExtended {
|
||||||
|
pub fn new_empty_jitter() -> Self {
|
||||||
|
Self {
|
||||||
|
service_name : "zvks".to_owned(),
|
||||||
|
endpoint_name : "jitter".to_owned(),
|
||||||
|
metrics : Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn add(&mut self, metric: MetricOutputExtended) {
|
||||||
|
self.metrics.push(metric);
|
||||||
|
}
|
||||||
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
|
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
service_name : "zvks".to_owned(),
|
service_name : "zvks".to_owned(),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue