Compare commits

..

26 Commits

Author SHA1 Message Date
deployer3000 db810c178e Merge pull request 'rc' (#53) from rc into master 2025-07-04 15:18:24 +03:00
Vladislav Drozdov 3657720074 Merge pull request 'feature/device-out' (#51) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/51
2025-06-25 17:22:49 +03:00
prplV 0ba5ff0e9f docs ready
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-25 10:19:42 -04:00
prplV 24e486410e fixed Dockerfile 2025-06-25 09:43:15 -04:00
Vladislav Drozdov e72310f8ab Merge pull request 'for buildings' (#49) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master There was a failure building this commit Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/49
2025-06-23 10:36:03 +03:00
prplV 5c62677ca5 for buildings
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-23 03:25:23 -04:00
Vladislav Drozdov e4b8cf2d54 Merge pull request 'for Jenkins test' (#48) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master There was a failure building this commit Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/48
2025-06-16 10:30:16 +03:00
prplV eaa551be50 for test
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-16 03:27:06 -04:00
Vladislav Drozdov 903ace3197 Merge pull request 'another ref' (#45) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master There was a failure building this commit Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/45
2025-06-11 15:42:40 +03:00
prplV 91acb6da23 another ref
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-11 08:41:02 -04:00
Vladislav Drozdov 0a1324b28a Merge pull request '+ api token + ref' (#43) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master There was a failure building this commit Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/43
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-06-11 13:29:45 +03:00
prplV e3761f5513 + api token + ref
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-11 05:06:46 -04:00
deployer3000 b1196d5177 Merge pull request 'rc' (#42) from rc into master
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-06 16:06:03 +03:00
Vladislav Drozdov 5de11c05ff Merge pull request 'add config delay' (#41) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/41
2025-06-06 16:01:39 +03:00
prplV 0cf7565dff add config delay
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-06 09:00:05 -04:00
deployer3000 fd813b04b6 Merge pull request 'rc' (#40) from rc into master 2025-05-27 12:33:36 +03:00
Vladislav Drozdov c8bf5e7cd9 Merge pull request 'feature/178' (#39) from feature/178 into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/39
2025-05-27 12:28:46 +03:00
deployer3000 04b4ffac2d Merge pull request 'Merge pull request 'feature/178' (#36) from feature/178 into rc' (#38) from rc into feature/178
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-05-27 12:18:50 +03:00
prplV 2f1de2b4f4 env vars added 2025-05-27 05:12:20 -04:00
deployer3000 dfe52ddc49 Merge pull request 'rc' (#37) from rc into master 2025-05-27 11:49:08 +03:00
Vladislav Drozdov f360b8158f Merge pull request 'feature/178' (#36) from feature/178 into rc
test-org/integration-module/pipeline/pr-master Build succeeded
test-org/integration-module/pipeline/pr-feature/178 Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/36
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-05-27 11:43:53 +03:00
prplV 66b66b966b adding tracing
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-05-27 02:43:21 -04:00
prplV 7cc7c0799a fix and finish 2025-05-26 13:28:15 -04:00
prplV af604c55a6 metrics vinteo native update 2025-05-26 09:19:57 -04:00
deployer3000 03c9a12ffa Merge pull request 'rc' (#35) from rc into master 2025-05-20 12:30:28 +03:00
Vladislav Drozdov f4b560a454 Merge pull request 'feature/160' (#34) from feature/160 into rc
test-org/integration-module/pipeline/pr-master Build started... Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/34
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-05-20 12:26:32 +03:00
9 changed files with 442 additions and 169 deletions

4
.dockerignore Normal file
View File

@ -0,0 +1,4 @@
/target
Cargo.lock
*.sock
.env

View File

@ -1,15 +1,21 @@
# Template .env for API grabber
# PostgreSQL connection [DEPRECATED]
# -------------------------------
DB_HOST = "ip.addr.postgresql.server"
DB_USER = "db_user"
DB_PASSWORD = "db_user_password"
DB_DBNAME = "db_name"1
# Prometheus-Exporter info
# -------------------------------
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
# VINTEO Jitter puller (needed to init Jitter native grab)
# -------------------------------
VINTEO_URL_BASE = "http(s)://ip.ip.ip.ip:port"
VINTEO_ENDPOINT_CONFERENCES = "/api/v1/to/something"
VINTEO_ENDPOINT_PARTICIPANTS = "/api/v1/to/something"
VINTEO_API_KEY = "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"
# Status Model API support
@ -17,14 +23,25 @@ VINTEO_API_KEY = "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8
STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input"
# eNODE.Monitoring configuration
# -------------------------------
# eNODE.Monitoring server IP
ENODE_MONITORING_IP = "ip.ip.ip.ip"
# eNODE.Monitoring credentials
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
ENODE_TARGET_DEVICES = "device$18,device$19"
# List of target devices
ENODE_TARGET_DEVICES = "18, 19"
# to work with unlimit API-Token
ENODE_API_TOKEN = "sssswwwwaaaaffff"
# OPTIONAL SETTINGS
# -------------------------------
# IM configuration for max level of logging info
# for example DEBUG, INFO, WARN, ERROR, TRACE
IM_LOG_INFO = "INFO"
# IM configuration for setting up API connetion
# timeout (in secs). Default value - 10
IM_CONNECTION_TIMEOUT = "10"
# IM configuration for delay of requests
# delay (in secs). Default value - 5
IM_REQUEST_DELAY = "20"

View File

@ -5,7 +5,11 @@ RUN apt update && apt install -y musl-tools
RUN rustup target add x86_64-unknown-linux-musl
COPY . .
RUN cargo test
ENV CARGO_HTTP_DEBUG=true
ENV CARGO_HTTP_TIMEOUT=100
RUN cargo test --verbose
RUN cargo build --release --target=x86_64-unknown-linux-musl
FROM alpine:latest

158
README.md
View File

@ -1,39 +1,173 @@
# Интеграционный модуль для проекта "Буревестник ВКС"
## Описание
`integr_mod` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
`Интеграционный модуль (ИМ)` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
- получение данных через API ВКС
- поддержку хранения, валидации и актуализации собственных конфигураций
- предобработку полученных данных и ~~отправку `Системе Мониторинга`~~ сохранение в БД
- предобработку полученных данных и сохранение в БД
- интеграции с `еНОД.Мониторинг`
## Специфика работы
На даннный момент предусмотрено два режима работы:
1) **Нативный** - режим работы, производящий прямой опрос сервиса видео-конференц связи `Vinteo` и соотвествующий процесс `ETL`
2) **Статичный** - режим работы *"посредник"*, когда все метрические данные ВКС `Vinteo` получаются через `REST-Full API` средства `еНОД.Мониторинг`
3) **Системный** - аналогичный **статичному** режиму, но метрические данные (заведомо обогащенные нулевым статусом) отправялются не напрямую в модуль `Prometehus exporter`, а в `Статусную модель`
4) **Vinteo** - особый режим работы, предполагающий сбор определенного набора метрик напрямую с ВКС `Vinteo` механизмом многоэтапного `API-запроса`
> **Примечание**
По стандарту `ИМ` работает в **НАТИВНОМ** режиме и ожидает конфигурации в формате `.json`, однако приоритетным считается **СТАТИЧНЫЙ** режим. Подробная информация о настройке в пункте `Руководство`
## Руководство
1. Заполнить .env файл или установить переменные окружения в соотвествии с примером в `.env.example` файле
В данном разделе опсиан алгоритм настройки, сборки и запуска программного модуля `ИМ`
### Преднастройка
1. Выбор режима работы модуля, который скорректирует принцип настройки:
| Режим работы | .env | config-api.json | $STATUS_SYSTEM_URL | $EXPORTER_URL |
|---|---|---|---|---|
| Нативный | ❌ | ✅ | ❌ | ❌ |
| Статичный | ✅ | ❌ | ❌❌❌ | ✅ |
| Системный | ✅ | ❌ | ✅ | ❌ |
, где:
✅ -- следует настроить (предпринять)
❌ -- игнорируется системой, не стоит настраивать
❌❌❌ -- **НЕЛЬЗЯ** настраивать (предпринимать), возможны ошибки в работе
> Режим работы `Vinteo` *не описан* в таблице **намеренно**
### Настройка режима работы "Нативный"
Для настройки данного режима необходимо расположить в **активной** директории конфигурационный `config_api.json` файл:
``` json
{
"config": [
{
"id":"zvks",
"login" : "",
"pass" : "",
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
"period" : "",
"timeout" : "5",
"metrics" : [
{
"name": "conferences",
"url": "https://demo.vcs.vinteo.dev/api/v1/conferences",
"measure": [
{ "id":"number", "type": "text", "addr": "data.conferences[].number" },
{ "id":"total", "type": "integer", "addr": "data.total" },
{ "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" },
{ "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" },
{ "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" }
]
},
{
"name": "abonents",
"url": "https://demo.vcs.vinteo.dev/api/v1/accounts",
"measure": [
{ "id":"total", "type": "integer", "addr": "data.total" }
]
}
]
}
]
}
```
> **Примечание**
Название конфигурационного файла должно быть как в примере - `config_api.json`
### Настройка режима работы "Статичный"
Для настройки данного режима необходимо пополнить данными о сервере в `.env` файле по примеру:
``` toml
# Template .env for API grabber
# Prometheus-Exporter info
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
...
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port" # <--- экспорт данных (обязательно)
# eNODE.Monitoring configuration
ENODE_MONITORING_IP = "ip.ip.ip.ip"
# admin user is required
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"# ---> получение данных
# admin password is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
...
```
### Настройка режима работы "Системный"
Для настройки данного режима необходимо пополнить данными о сервере в `.env` файле по примеру:
``` toml
...
STATUS_SYSTEM_URL = "http(s)://{ip}:{port}/api/input"# <--- экспорт данных
# eNODE.Monitoring configuration
ENODE_MONITORING_IP = "{ip}.{ip}.{ip}.{ip}"
# admin user is required
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"# ---> получение данных
# admin password is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
...
```
### Настройка режима работы "Vinteo"
Для работы в данном режиме необходимо установить переменные окружения в соотвествии со списком ниже
``` toml
...
VINTEO_URL_BASE = "https://demo.vcs.vinteo.dev"
VINTEO_ENDPOINT_CONFERENCES = "/api/v1/conferences"
VINTEO_ENDPOINT_PARTICIPANTS = "/api/v1/participants/"
VINTEO_API_KEY = "00000000000111111111.aaaaaaaaaaaaaaabbbbbbbbbbbbb"
...
```
### Настройка экспорта полученных и обработанных данных
Настройка *точки выхода* для полученных и обработанных метрик определеяется установленными в переменных окружения параметрами, варианта два:
1) **Экспорт в статусную модель** в рамках механизма сквозного прохода данных в проекте `Буревестник ВКС`
``` toml
...
STATUS_SYSTEM_URL = "{BASE_URL}/{ROUTE}"
...
```
2) **Экспорт в экспортер или иной потребитель данных**
``` toml
...
EXPORTER_URL = "{BASE_URL}/{ROUTE}"
...
```
> ### **ОЧЕНЬ ВАЖНОЕ ПРИМЕЧАНИЕ**
> ---
> Одновременное использование `$STATUS_SYSTEM_URL` и `$EXPORTER_URL` **НЕДОПУСТИМО** !! Вариант со ссылкой **на статусную модель** является _по стандарту_ **БОЛЕЕ ПРИОРИТЕТНЫМ**, второй затрется, использовать необходимо только один
2. Произвести сборку проекта командой :
``` bash
cargo build --release
```
3. Запустить
> Debug версия
> **Debug** версия
``` bash
cargo run --bin api-grub
```
или
> Release версия
> **Release** версия
``` bash
cargo run --release --bin api-grub
```
@ -42,6 +176,6 @@ cargo run --release --bin api-grub
| Крейт (подмодуль) | Прогресс |
|---|---|
|`api-grub` | ✅✅✅✅✅✅✅✅✅🛠️ |
|`config-delivery [migrated]` | ❌❌❌❌❌❌❌❌❌❌ |
|`config-delivery` [migrated] | ❌❌❌❌❌❌❌❌❌❌ |
|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ |
|`preproc` [temp-deprecated] | ❌❌❌❌❌❌❌❌❌❌ | (разработка временно остановлена)

View File

@ -1,6 +1,6 @@
[package]
name = "api-grub"
version = "1.0.11"
version = "1.0.15"
edition = "2021"
authors = ["Vladislav Drozdov <maseeeeeeeed@gmail.com>"]
description = "API poller for ZVKS project"
@ -28,3 +28,5 @@ openssl = { version = "0.10", features = ["vendored"] }
tracing-subscriber = "0.3.19"
tracing = "0.1.41"
lazy_static = "1.5.0"
futures = "0.3.31"
async-stream = "0.3.6"

View File

@ -1,22 +1,43 @@
use std::collections::{HashMap, HashSet};
use integr_structs::api::v3::PrometheusMetricsExtended;
use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended};
use std::sync::Arc;
use reqwest::Client;
use lazy_static::lazy_static;
use tracing::{error, info, warn};
use tracing::{error, info, trace, warn};
use std::pin::Pin;
use serde_json::{Value, from_str};
use crate::export::Exporter;
use async_stream::stream;
use futures::{future, pin_mut, stream::Stream, StreamExt};
lazy_static! {
static ref CONFERENCES_ENDPOINT: String = String::from("/api/v1/conferences");
static ref USERS_ENDPOINT: String = String::from("/api/v1/participants/");
static ref VINTEO_BASE: String = std::env::var("VINTEO_URL_BASE").unwrap_or_else(|_| String::from("https://demo.vcs.vinteo.dev"));
static ref CONFERENCES_ENDPOINT: String = std::env::var("VINTEO_ENDPOINT_CONFERENCES").unwrap_or_else(|_| String::from("/api/v1/conferences"));
static ref USERS_ENDPOINT: String = std::env::var("VINTEO_ENDPOINT_PARTICIPANTS").unwrap_or_else(|_| String::from("/api/v1/participants/"));
}
// conferences ids
type Conferences = HashSet<(String, String)>;
// hash map {CONFERENCE_ID - JITTER_VALUE}
type OutputConferences = HashMap<(String, String), u32>;
// hash map {PARTICIPANT_ID - {CONFERENCE_ID - JITTER_VALUE}}
type Jitter = HashMap<(String, String), OutputConferences>;
type OutputConferences = HashMap<(String, String), Value>;
#[derive(Debug, Clone)]
struct ConferencesDigital(OutputConferences);
impl ConferencesDigital {
pub fn new(item: OutputConferences) -> Self {
Self(item)
}
pub fn go_across(self) -> impl Stream<Item = ((String, String), Arc<Value>)> {
stream! {
for (key, value) in self.0 {
info!("Conference {} ({}) is processing ...", key.0, key.1);
yield (key, Arc::new(value));
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
}
// to handle async http requests
#[derive(Debug)]
@ -29,12 +50,13 @@ struct Requester {
impl Requester {
pub fn new() -> anyhow::Result<Self> {
Ok(Self {
base: String::from("https://demo.vcs.vinteo.dev"),
api_key: std::env::var("VINTEO_API_KEY").unwrap(),
base: VINTEO_BASE.clone().to_owned(),
api_key: std::env::var("VINTEO_API_KEY")?,
client: Client::builder().user_agent("api-grub").build()?,
})
}
pub async fn get_conferences(&self) -> anyhow::Result<Value> {
trace!("getting conferences list from API");
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
let req = self.client.get(url)
.timeout(tokio::time::Duration::from_secs(10))
@ -44,6 +66,7 @@ impl Requester {
Ok(from_str(&resp?.text().await?)?)
}
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
trace!("extracting conferences list");
let mut hashset = Conferences::new();
match conferences.get("data") {
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
@ -79,8 +102,9 @@ impl Requester {
}
Ok(hashset)
}
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<Jitter> {
let mut output = Jitter::new();
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<OutputConferences> {
trace!("getting users list for each conference from API ...");
let mut output = OutputConferences::new();
for conf in conferences {
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
let req = self.client.get(url)
@ -95,70 +119,7 @@ impl Requester {
Err(er) => error!("Cannot convert response into text: {}", er),
Ok(resp) => {
let resp: Value = from_str(&resp)?;
if let Some(data) = resp.get("data") {
if let Some(parts) = data.get("participants") {
if let Some(parts) = parts.as_array() {
for participant in parts {
// save id
let id = {
if let Some(id) = participant.get("id") {
let id = serde_json::to_string(id)?.replace("\"", "");
if let Some(watermark) = participant.get("watermark") {
match watermark.as_str() {
Some(watermark) => (watermark.to_string(), id),
None => (String::new(), id),
}
} else {
error!("No `watermark` field was found in participant from {:?} conference. Skipping ...", &conf);
(String::new(), id)
}
} else {
error!("No `id` field was found in participant from {:?} conference. Skipping ...", &conf);
continue;
}
};
if let Some(params) = participant.get("params") {
let jitter = {
if let Some(jitter) = params.get("jBLen") {
// find substring
let temp = serde_json::to_string(jitter)?.replace(" ms", "").replace("\"", "");
// string to u32
if let Ok(jitter) = temp.parse::<u32>() {
jitter
} else {
error!("Invalid type of `jBLen` field for participant {:?} in conference {:?}. Skipping ...", &id, &conf);
continue;
}
} else {
error!("No `jBLen` field was found in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
continue;
}
};
// hm push
output.entry(id)
.and_modify(|map| {
map.entry(conf.clone())
.and_modify(|val| *val = jitter)
.or_insert(jitter); } )
.or_insert( {
let mut hm = OutputConferences::new();
hm.insert(conf.clone(), jitter);
hm
} );
} else {
error!("No `params` field in participant {:?} from conference {:?}. Skipping ...", &id, &conf);
}
}
}
else {
return Err(anyhow::Error::msg("Invalid JSON format: `participants` is not an array"))
}
} else {
return Err(anyhow::Error::msg("Invalid JSON format: no `participants` field"))
}
} else {
return Err(anyhow::Error::msg("Invalid JSON format: no `data` field"))
}
output.entry(conf).or_insert(resp);
}
}
},
@ -166,28 +127,94 @@ impl Requester {
}
Ok(output)
}
async fn get_metrics_from_jitter(jitter: Jitter) -> integr_structs::api::v3::PrometheusMetricsExtended {
use integr_structs::api::v3::MetricOutputExtended;
let mut metrics = PrometheusMetricsExtended::new_empty_jitter();
// for ((name, id), confs) in jitter {
// confs.iter()
// .for_each(|((conf_name, conf_id), &value)| {todo!()});
// }
jitter.iter()
.map(|(participant, confs)| (confs.iter(), participant))
.for_each(|(conf_iter, (name, id))| {
conf_iter.for_each(|((conf_name, conf_id), &jitter)| {
let metric_id = format!("j{}{}", id, conf_id);
let desc = format!("Фазовое отклонение на цифровом канале (jBLen) у пользователя `{}` (id: {}) в конференции `{}` (id: {})", name, id, conf_name, conf_id);
let val = serde_json::to_value(jitter).unwrap_or_else(|er| {
error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er);
Value::Null
});
metrics.add(MetricOutputExtended::new_with_slices(&metric_id, &metric_id,"int", "Vinteo native", &desc, None, None,val));
});
});
metrics
}
fn across_participants(conf: &Value) -> impl Stream<Item = Arc<Value>> + '_ {
trace!("fetching participants for conf and going across ...");
let participants_iter = conf
.get("data")
.and_then(|data| data.get("participants"))
.and_then(|participants| participants.as_array())
.into_iter()
.flatten()
.cloned();
stream! {
for participant in participants_iter {
yield Arc::new(participant);
}
}
}
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<(&str, &str)>) -> Vec<Option<MetricOutputExtended>> {
trace!("extracting {} measures for current participant in current conference {} ...", targets.len(), conf_id);
let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new();
for (target, description) in targets {
futures.push(Box::pin(extract_from_participant(
parts.clone(),
target,
conf_id.clone(),
conf_desc.clone(),
description
)));
}
future::join_all(futures).await
}
// GET participants/{conference_id} data.total
async fn extract_total_participants(conferences : Arc<Value>) -> Value {
if let Some(total) = conferences
.get("data")
.and_then(|data| data.get("total")) {
return total.clone()
}
Value::Null
}
async fn extract_total_anon_participants(conferences : Arc<Value>) -> Value {
let sum = conferences
.get("data")
.and_then(|data| data.get("participants"))
.and_then(|parts| parts.as_array())
.iter()
.flat_map(|&val| val.iter())
.filter_map(|part| part.get("isAnonymous"))
.filter_map(|is_anon| is_anon.as_bool())
.filter(|is_anon| *is_anon == true)
.count();
Value::Number(sum.into())
}
// GET participants/{conference_id} data.participants[].isAnonymous
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>, description: &str) -> Option<MetricOutputExtended> {
trace!("extracting `{}` measure for current participant ...", target);
if let Some(value) = participant.get("params")
.and_then(|params| params.get(target)) {
let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
let id = participant.get("id").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
let metric_name = format!("{}_{}_{}", target, conf_id, id);
return Some(
MetricOutputExtended::new_with_slices(
&metric_name,
&metric_name,
"type",
"Vinteo native",
format!(
"{} для пользователя {} ({}) в конференции {} ({})",
description,
name,
id,
conf_desc,
conf_id
).as_ref(),
Some(18),
Some("module$12".to_string()),
value.clone()
)
)
}
None
}
pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
@ -200,12 +227,85 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
let conferences = Requester::get_conferences_from_value(conferences).await?;
info!("Getting jitter metrics by user in each conference ...");
let jitter = requester.get_users_jitter_by_conference(conferences).await?;
let confs = requester.get_users_jitter_by_conference(conferences).await?;
info!("Casting jitter metrics into Metrics format for Prometheus ...");
let metrics = Requester::get_metrics_from_jitter(jitter).await;
info!("Initializing extraction mechanism ...");
let conferences = ConferencesDigital::new(confs);
let extracted_conference = conferences.go_across();
let mut buffer: Vec<MetricOutputExtended> = Vec::new();
pin_mut!(extracted_conference);
if !metrics.metrics.is_empty() {
while let Some(((desc, id), item)) = extracted_conference.next().await {
let parts_stream = across_participants(&item);
pin_mut!(parts_stream);
let total_participants = MetricOutputExtended::new_with_slices(
format!("TotalParticipants_{}", id).as_ref(),
format!("TotalParticipants_{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество участников в конференции {}", &desc).as_ref(),
Some(18),
Some(String::from("module$12")),
extract_total_participants(item.clone()).await
);
// anon NON_STABLE
let total_anon_participants = MetricOutputExtended::new_with_slices(
format!("TotalAnonymousParticipants_{}", id).as_ref(),
format!("TotalAnonymousParticipants_{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество анонимных участников в конференции {}", &desc).as_ref(),
Some(18),
Some(String::from("module$12")),
extract_total_anon_participants(item.clone()).await
);
// description
buffer.push(total_participants);
buffer.push(total_anon_participants);
while let Some(participant) = parts_stream.next().await {
buffer.append(
&mut gather_futures(
participant,
Arc::from(id.as_ref()),
Arc::from(desc.as_ref()),
vec![
("txBitrate", "Скорость отправки данных"),
("rxBitrate", "Скорость приёма данных"),
("txFPS", "Количество отправляемых кадров в секунду"),
("rxFPS", "Количество получаемых кадров в секунду"),
("rxLost", "Количество потерянных пакетов на приёме"),
("txLost", "Количество потерянных пакетов на отправке"),
("jBLen", "Фазовое отклонение на цифровом канале"),
("txLostPercent", "Процент потерянных пакетов на отправке"),
("rxLostPercent", "Процент потерянных пакетов на приёме"),
("txH239Bitrate", "Скорость передачи данных в дополнительном видео-потоке"),
("rxH239Bitrate", "Скорость приёма данных в дополнительном видео-потоке"),
("txVideoCodec", "Используемый видеокодек на отправке"),
("rxVideoCodec", "Используемый видеокодек на приёме"),
("txAudioCodec", "Используемый аудиокодек на отправке"),
("rxAudioCodec", "Используемый аудиокодек на приёме"),
("txResolution", "Разрешение передаваемого видео"),
("rxResolution", "Разрешение принимаемого видео"),
("txH239Resolution", "Разрешение дополнительного видео-потока"),
("rxH239Resolution", "Разрешение дополнительного видео-потока на принимающей стороне"),
("duration", "Длительсность сеанса (в минутах)"),
],
).await
.into_iter()
.filter(|metric| metric.is_some())
.map(|metric| metric.unwrap() )
.collect::<Vec<MetricOutputExtended>>()
);
}
}
// let metrics = Requester::get_metrics_from_jitter(jitter).await;
if !buffer.is_empty() {
let metrics = PrometheusMetricsExtended::new_zvks(buffer).await;
match Exporter::export_extended_metrics(metrics).await {
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
Err(er) => error!("Cannot export data: {}", er),
@ -213,8 +313,6 @@ pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
} else {
warn!("Metrics array is empty. Ignoring exporting ...");
}
// Exporter::export_extended_metrics(metrics)
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
// get users' jitter info

View File

@ -1,6 +1,4 @@
use std::str::FromStr;
use chrono::Local;
use anyhow::Result;
use tracing::info;
@ -21,20 +19,6 @@ use tracing::info;
/// *depends on* : -
///
pub async fn setup_logger() -> Result<()> {
// Builder::new()
// .format(move |buf, record| {
// writeln!(
// buf,
// "|{}| {} [{}] - {}",
// "api-grubber",
// Local::now().format("%d-%m-%Y %H:%M:%S"),
// record.level(),
// record.args(),
// )
// })
// .filter(None, LevelFilter::Info)
// .target(env_logger::Target::Stdout)
// .init();
let log_level = std::env::var("IM_LOG_INFO").unwrap_or_else(|_| String::from("INFO"));
tracing_subscriber::fmt()

View File

@ -36,10 +36,16 @@ async fn main() -> Result<()>{
},
}
});
let request_delay = std::env::var("IM_REQUEST_DELAY")
.unwrap_or_else(|_| String::from("5"))
.parse::<u32>()
.unwrap_or_else(|_| {
warn!("No delay was set, setting up as 5 secs ..."); 5
});
let event_grub = tokio::spawn(async move {
// GRAB USING eNODE.MONITORING API GATEWAY
if std::env::var("ENODE_MONITORING_IP").is_ok() {
match get_metrics_from_monitoring(0, 5).await {
match get_metrics_from_monitoring(0, request_delay as usize).await {
Ok(_) => info!("Grabing (eNODE.Monitoring) task de-initialized"),
Err(er) => error!("Grabing task returned an error : {}", er),
}

View File

@ -79,7 +79,18 @@ pub trait LazyUnzipInstance {
impl LazyUnzipInstance for &[MetricInstance] {
fn lazy_unzip(&self) -> HashMap<String, MetricMeta> {
self.iter().map(|obj| (obj.dola_id.to_string(), MetricMeta::new(&obj.name, &obj.desc, &obj.device, &obj.source))).collect()
self.iter().map(
|obj|
(
obj.dola_id.to_string(),
MetricMeta::new(
&obj.name,
&obj.desc,
&obj.device,
&obj.source
)
)
).collect()
}
}
@ -183,6 +194,7 @@ pub struct MonitoringImporter {
login : String,
password : String,
access_token : String,
api_token : String,
ts : String,
timeout : usize,
}
@ -205,6 +217,7 @@ impl MonitoringImporter {
login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()),
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
access_token : String::new(),
api_token : env::var("ENODE_API_TOKEN").unwrap_or_else(|_| String::new()),
ts : String::new(),
timeout : std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()).parse().unwrap_or_else(|_| 10)
}
@ -213,7 +226,13 @@ impl MonitoringImporter {
/// and can be used to pull and push info to and from CM
///
async fn is_valid(&self) -> bool {
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
!self.ip.is_empty() && ((!self.login.is_empty() && !self.password.is_empty() ) || !self.api_token.is_empty())
}
/// Function that checks is current `MonitoringImporter` valid
/// and can be used to pull and push info to and from CM
///
async fn is_minimal(&self) -> bool {
(self.login.is_empty() || self.password.is_empty()) && !self.api_token.is_empty()
}
/// A setter of `timestamp`
///
@ -235,8 +254,15 @@ impl MonitoringImporter {
#[tracing::instrument(name = "cm_fn_session_start", skip_all)]
pub async fn start_session(&mut self) -> anyhow::Result<()> {
if !self.is_valid().await {
if self.is_minimal().await {
return Err(Error::msg(format!("Given API-Token is no more actual now ({})", &self.access_token)));
}
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
}
if !self.api_token.is_empty() {
std::mem::swap(&mut self.access_token, &mut self.api_token);
info!("API-Token that was in the ENODE configuration was set as access-token");
} else {
let client = Client::new();
let url = format!("http://{}/e-data-front/auth/login", self.ip);
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
@ -248,10 +274,7 @@ impl MonitoringImporter {
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
.header("Content-Type", "application/json")
.json(&fortoken);
// let resp = client.send().await?;
if let Ok(resp) = client.send().await {
// let auth = resp.json::<AuthResponse>().await?;
match resp.json::<AuthResponse>().await {
Ok(auth) => {
self.set_ts(&fortoken.ts).await;
@ -267,6 +290,7 @@ impl MonitoringImporter {
delay = delay * 2;
}
info!("Started a new CM session");
}
Ok(())
}