Compare commits
No commits in common. "master" and "hotfix/1307" have entirely different histories.
master
...
hotfix/130
|
|
@ -1,4 +0,0 @@
|
||||||
/target
|
|
||||||
Cargo.lock
|
|
||||||
*.sock
|
|
||||||
.env
|
|
||||||
21
.env.example
21
.env.example
|
|
@ -1,47 +1,26 @@
|
||||||
# Template .env for API grabber
|
# Template .env for API grabber
|
||||||
|
|
||||||
# PostgreSQL connection [DEPRECATED]
|
# PostgreSQL connection [DEPRECATED]
|
||||||
# -------------------------------
|
|
||||||
DB_HOST = "ip.addr.postgresql.server"
|
DB_HOST = "ip.addr.postgresql.server"
|
||||||
DB_USER = "db_user"
|
DB_USER = "db_user"
|
||||||
DB_PASSWORD = "db_user_password"
|
DB_PASSWORD = "db_user_password"
|
||||||
DB_DBNAME = "db_name"1
|
DB_DBNAME = "db_name"1
|
||||||
|
|
||||||
# Prometheus-Exporter info
|
# Prometheus-Exporter info
|
||||||
# -------------------------------
|
|
||||||
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
|
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
|
||||||
|
|
||||||
# VINTEO Jitter puller (needed to init Jitter native grab)
|
|
||||||
# -------------------------------
|
|
||||||
VINTEO_URL_BASE = "http(s)://ip.ip.ip.ip:port"
|
|
||||||
VINTEO_ENDPOINT_CONFERENCES = "/api/v1/to/something"
|
|
||||||
VINTEO_ENDPOINT_PARTICIPANTS = "/api/v1/to/something"
|
|
||||||
VINTEO_API_KEY = "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"
|
|
||||||
|
|
||||||
# Status Model API support
|
# Status Model API support
|
||||||
# > if exists, ignore `EXPORTER_URL` var
|
# > if exists, ignore `EXPORTER_URL` var
|
||||||
STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input"
|
STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input"
|
||||||
|
|
||||||
# eNODE.Monitoring configuration
|
# eNODE.Monitoring configuration
|
||||||
# -------------------------------
|
|
||||||
# eNODE.Monitoring server IP
|
|
||||||
ENODE_MONITORING_IP = "ip.ip.ip.ip"
|
ENODE_MONITORING_IP = "ip.ip.ip.ip"
|
||||||
# eNODE.Monitoring credentials
|
|
||||||
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
|
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
|
||||||
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
|
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
|
||||||
# List of target devices
|
|
||||||
ENODE_TARGET_DEVICES = "18, 19"
|
|
||||||
# to work with unlimit API-Token
|
|
||||||
ENODE_API_TOKEN = "sssswwwwaaaaffff"
|
|
||||||
|
|
||||||
# OPTIONAL SETTINGS
|
|
||||||
# -------------------------------
|
|
||||||
# IM configuration for max level of logging info
|
# IM configuration for max level of logging info
|
||||||
# for example DEBUG, INFO, WARN, ERROR, TRACE
|
# for example DEBUG, INFO, WARN, ERROR, TRACE
|
||||||
IM_LOG_INFO = "INFO"
|
IM_LOG_INFO = "INFO"
|
||||||
# IM configuration for setting up API connetion
|
# IM configuration for setting up API connetion
|
||||||
# timeout (in secs). Default value - 10
|
# timeout (in secs). Default value - 10
|
||||||
IM_CONNECTION_TIMEOUT = "10"
|
IM_CONNECTION_TIMEOUT = "10"
|
||||||
# IM configuration for delay of requests
|
|
||||||
# delay (in secs). Default value - 5
|
|
||||||
IM_REQUEST_DELAY = "20"
|
|
||||||
|
|
@ -5,11 +5,7 @@ RUN apt update && apt install -y musl-tools
|
||||||
RUN rustup target add x86_64-unknown-linux-musl
|
RUN rustup target add x86_64-unknown-linux-musl
|
||||||
|
|
||||||
COPY . .
|
COPY . .
|
||||||
|
RUN cargo test
|
||||||
ENV CARGO_HTTP_DEBUG=true
|
|
||||||
ENV CARGO_HTTP_TIMEOUT=100
|
|
||||||
|
|
||||||
RUN cargo test --verbose
|
|
||||||
RUN cargo build --release --target=x86_64-unknown-linux-musl
|
RUN cargo build --release --target=x86_64-unknown-linux-musl
|
||||||
|
|
||||||
FROM alpine:latest
|
FROM alpine:latest
|
||||||
|
|
|
||||||
158
README.md
158
README.md
|
|
@ -1,173 +1,39 @@
|
||||||
# Интеграционный модуль для проекта "Буревестник ВКС"
|
# Интеграционный модуль для проекта "Буревестник ВКС"
|
||||||
|
|
||||||
## Описание
|
## Описание
|
||||||
`Интеграционный модуль (ИМ)` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
|
`integr_mod` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
|
||||||
- получение данных через API ВКС
|
- получение данных через API ВКС
|
||||||
- поддержку хранения, валидации и актуализации собственных конфигураций
|
- поддержку хранения, валидации и актуализации собственных конфигураций
|
||||||
- предобработку полученных данных и сохранение в БД
|
- предобработку полученных данных и ~~отправку `Системе Мониторинга`~~ сохранение в БД
|
||||||
- интеграции с `еНОД.Мониторинг`
|
|
||||||
|
|
||||||
## Специфика работы
|
|
||||||
|
|
||||||
На даннный момент предусмотрено два режима работы:
|
|
||||||
|
|
||||||
1) **Нативный** - режим работы, производящий прямой опрос сервиса видео-конференц связи `Vinteo` и соотвествующий процесс `ETL`
|
|
||||||
|
|
||||||
2) **Статичный** - режим работы *"посредник"*, когда все метрические данные ВКС `Vinteo` получаются через `REST-Full API` средства `еНОД.Мониторинг`
|
|
||||||
|
|
||||||
3) **Системный** - аналогичный **статичному** режиму, но метрические данные (заведомо обогащенные нулевым статусом) отправялются не напрямую в модуль `Prometehus exporter`, а в `Статусную модель`
|
|
||||||
|
|
||||||
4) **Vinteo** - особый режим работы, предполагающий сбор определенного набора метрик напрямую с ВКС `Vinteo` механизмом многоэтапного `API-запроса`
|
|
||||||
|
|
||||||
> **Примечание**
|
|
||||||
По стандарту `ИМ` работает в **НАТИВНОМ** режиме и ожидает конфигурации в формате `.json`, однако приоритетным считается **СТАТИЧНЫЙ** режим. Подробная информация о настройке в пункте `Руководство`
|
|
||||||
|
|
||||||
## Руководство
|
## Руководство
|
||||||
|
|
||||||
В данном разделе опсиан алгоритм настройки, сборки и запуска программного модуля `ИМ`
|
1. Заполнить .env файл или установить переменные окружения в соотвествии с примером в `.env.example` файле
|
||||||
|
|
||||||
### Преднастройка
|
|
||||||
|
|
||||||
1. Выбор режима работы модуля, который скорректирует принцип настройки:
|
|
||||||
|
|
||||||
| Режим работы | .env | config-api.json | $STATUS_SYSTEM_URL | $EXPORTER_URL |
|
|
||||||
|---|---|---|---|---|
|
|
||||||
| Нативный | ❌ | ✅ | ❌ | ❌ |
|
|
||||||
| Статичный | ✅ | ❌ | ❌❌❌ | ✅ |
|
|
||||||
| Системный | ✅ | ❌ | ✅ | ❌ |
|
|
||||||
, где:
|
|
||||||
|
|
||||||
✅ -- следует настроить (предпринять)
|
|
||||||
|
|
||||||
❌ -- игнорируется системой, не стоит настраивать
|
|
||||||
|
|
||||||
❌❌❌ -- **НЕЛЬЗЯ** настраивать (предпринимать), возможны ошибки в работе
|
|
||||||
|
|
||||||
> Режим работы `Vinteo` *не описан* в таблице **намеренно**
|
|
||||||
|
|
||||||
### Настройка режима работы "Нативный"
|
|
||||||
|
|
||||||
Для настройки данного режима необходимо расположить в **активной** директории конфигурационный `config_api.json` файл:
|
|
||||||
|
|
||||||
``` json
|
|
||||||
{
|
|
||||||
"config": [
|
|
||||||
{
|
|
||||||
"id":"zvks",
|
|
||||||
"login" : "",
|
|
||||||
"pass" : "",
|
|
||||||
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
|
||||||
"period" : "",
|
|
||||||
"timeout" : "5",
|
|
||||||
"metrics" : [
|
|
||||||
{
|
|
||||||
"name": "conferences",
|
|
||||||
"url": "https://demo.vcs.vinteo.dev/api/v1/conferences",
|
|
||||||
"measure": [
|
|
||||||
{ "id":"number", "type": "text", "addr": "data.conferences[].number" },
|
|
||||||
{ "id":"total", "type": "integer", "addr": "data.total" },
|
|
||||||
{ "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" },
|
|
||||||
{ "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" },
|
|
||||||
{ "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" }
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "abonents",
|
|
||||||
"url": "https://demo.vcs.vinteo.dev/api/v1/accounts",
|
|
||||||
"measure": [
|
|
||||||
{ "id":"total", "type": "integer", "addr": "data.total" }
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
> **Примечание**
|
|
||||||
Название конфигурационного файла должно быть как в примере - `config_api.json`
|
|
||||||
|
|
||||||
|
|
||||||
### Настройка режима работы "Статичный"
|
|
||||||
|
|
||||||
Для настройки данного режима необходимо пополнить данными о сервере в `.env` файле по примеру:
|
|
||||||
|
|
||||||
``` toml
|
``` toml
|
||||||
...
|
# Template .env for API grabber
|
||||||
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port" # <--- экспорт данных (обязательно)
|
|
||||||
|
# Prometheus-Exporter info
|
||||||
|
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
|
||||||
|
|
||||||
# eNODE.Monitoring configuration
|
# eNODE.Monitoring configuration
|
||||||
ENODE_MONITORING_IP = "ip.ip.ip.ip"
|
ENODE_MONITORING_IP = "ip.ip.ip.ip"
|
||||||
# admin user is required
|
# admin user is required
|
||||||
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"# ---> получение данных
|
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"
|
||||||
# admin password is required
|
# admin password is required
|
||||||
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
|
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
|
||||||
...
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Настройка режима работы "Системный"
|
|
||||||
|
|
||||||
Для настройки данного режима необходимо пополнить данными о сервере в `.env` файле по примеру:
|
|
||||||
|
|
||||||
``` toml
|
|
||||||
...
|
|
||||||
STATUS_SYSTEM_URL = "http(s)://{ip}:{port}/api/input"# <--- экспорт данных
|
|
||||||
# eNODE.Monitoring configuration
|
|
||||||
ENODE_MONITORING_IP = "{ip}.{ip}.{ip}.{ip}"
|
|
||||||
# admin user is required
|
|
||||||
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"# ---> получение данных
|
|
||||||
# admin password is required
|
|
||||||
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
|
|
||||||
...
|
|
||||||
```
|
|
||||||
|
|
||||||
### Настройка режима работы "Vinteo"
|
|
||||||
|
|
||||||
Для работы в данном режиме необходимо установить переменные окружения в соотвествии со списком ниже
|
|
||||||
|
|
||||||
``` toml
|
|
||||||
...
|
|
||||||
VINTEO_URL_BASE = "https://demo.vcs.vinteo.dev"
|
|
||||||
VINTEO_ENDPOINT_CONFERENCES = "/api/v1/conferences"
|
|
||||||
VINTEO_ENDPOINT_PARTICIPANTS = "/api/v1/participants/"
|
|
||||||
VINTEO_API_KEY = "00000000000111111111.aaaaaaaaaaaaaaabbbbbbbbbbbbb"
|
|
||||||
...
|
|
||||||
```
|
|
||||||
|
|
||||||
### Настройка экспорта полученных и обработанных данных
|
|
||||||
|
|
||||||
Настройка *точки выхода* для полученных и обработанных метрик определеяется установленными в переменных окружения параметрами, варианта два:
|
|
||||||
|
|
||||||
1) **Экспорт в статусную модель** в рамках механизма сквозного прохода данных в проекте `Буревестник ВКС`
|
|
||||||
|
|
||||||
``` toml
|
|
||||||
...
|
|
||||||
STATUS_SYSTEM_URL = "{BASE_URL}/{ROUTE}"
|
|
||||||
...
|
|
||||||
```
|
|
||||||
|
|
||||||
2) **Экспорт в экспортер или иной потребитель данных**
|
|
||||||
|
|
||||||
``` toml
|
|
||||||
...
|
|
||||||
EXPORTER_URL = "{BASE_URL}/{ROUTE}"
|
|
||||||
...
|
|
||||||
```
|
|
||||||
> ### **ОЧЕНЬ ВАЖНОЕ ПРИМЕЧАНИЕ**
|
|
||||||
> ---
|
|
||||||
> Одновременное использование `$STATUS_SYSTEM_URL` и `$EXPORTER_URL` **НЕДОПУСТИМО** !! Вариант со ссылкой **на статусную модель** является _по стандарту_ **БОЛЕЕ ПРИОРИТЕТНЫМ**, второй затрется, использовать необходимо только один
|
|
||||||
|
|
||||||
2. Произвести сборку проекта командой :
|
2. Произвести сборку проекта командой :
|
||||||
``` bash
|
``` bash
|
||||||
cargo build --release
|
cargo build --release
|
||||||
```
|
```
|
||||||
|
|
||||||
3. Запустить
|
3. Запустить
|
||||||
> **Debug** версия
|
> Debug версия
|
||||||
``` bash
|
``` bash
|
||||||
cargo run --bin api-grub
|
cargo run --bin api-grub
|
||||||
```
|
```
|
||||||
или
|
или
|
||||||
> **Release** версия
|
> Release версия
|
||||||
``` bash
|
``` bash
|
||||||
cargo run --release --bin api-grub
|
cargo run --release --bin api-grub
|
||||||
```
|
```
|
||||||
|
|
@ -176,6 +42,6 @@ cargo run --release --bin api-grub
|
||||||
| Крейт (подмодуль) | Прогресс |
|
| Крейт (подмодуль) | Прогресс |
|
||||||
|---|---|
|
|---|---|
|
||||||
|`api-grub` | ✅✅✅✅✅✅✅✅✅🛠️ |
|
|`api-grub` | ✅✅✅✅✅✅✅✅✅🛠️ |
|
||||||
|`config-delivery` [migrated] | ❌❌❌❌❌❌❌❌❌❌ |
|
|`config-delivery [migrated]` | ❌❌❌❌❌❌❌❌❌❌ |
|
||||||
|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ |
|
|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ |
|
||||||
|`preproc` [temp-deprecated] | ❌❌❌❌❌❌❌❌❌❌ | (разработка временно остановлена)
|
|`preproc` [temp-deprecated] | ❌❌❌❌❌❌❌❌❌❌ | (разработка временно остановлена)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "api-grub"
|
name = "api-grub"
|
||||||
version = "1.0.15"
|
version = "1.0.2"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
authors = ["Vladislav Drozdov <maseeeeeeeed@gmail.com>"]
|
authors = ["Vladislav Drozdov <maseeeeeeeed@gmail.com>"]
|
||||||
description = "API poller for ZVKS project"
|
description = "API poller for ZVKS project"
|
||||||
|
|
@ -27,6 +27,3 @@ sysinfo = "0.33.1"
|
||||||
openssl = { version = "0.10", features = ["vendored"] }
|
openssl = { version = "0.10", features = ["vendored"] }
|
||||||
tracing-subscriber = "0.3.19"
|
tracing-subscriber = "0.3.19"
|
||||||
tracing = "0.1.41"
|
tracing = "0.1.41"
|
||||||
lazy_static = "1.5.0"
|
|
||||||
futures = "0.3.31"
|
|
||||||
async-stream = "0.3.6"
|
|
||||||
|
|
|
||||||
|
|
@ -1,319 +0,0 @@
|
||||||
use std::collections::{HashMap, HashSet};
|
|
||||||
use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use reqwest::Client;
|
|
||||||
use lazy_static::lazy_static;
|
|
||||||
use tracing::{error, info, trace, warn};
|
|
||||||
use std::pin::Pin;
|
|
||||||
use serde_json::{Value, from_str};
|
|
||||||
use crate::export::Exporter;
|
|
||||||
use async_stream::stream;
|
|
||||||
use futures::{future, pin_mut, stream::Stream, StreamExt};
|
|
||||||
|
|
||||||
lazy_static! {
|
|
||||||
static ref VINTEO_BASE: String = std::env::var("VINTEO_URL_BASE").unwrap_or_else(|_| String::from("https://demo.vcs.vinteo.dev"));
|
|
||||||
static ref CONFERENCES_ENDPOINT: String = std::env::var("VINTEO_ENDPOINT_CONFERENCES").unwrap_or_else(|_| String::from("/api/v1/conferences"));
|
|
||||||
static ref USERS_ENDPOINT: String = std::env::var("VINTEO_ENDPOINT_PARTICIPANTS").unwrap_or_else(|_| String::from("/api/v1/participants/"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// conferences ids
|
|
||||||
type Conferences = HashSet<(String, String)>;
|
|
||||||
type OutputConferences = HashMap<(String, String), Value>;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct ConferencesDigital(OutputConferences);
|
|
||||||
|
|
||||||
impl ConferencesDigital {
|
|
||||||
pub fn new(item: OutputConferences) -> Self {
|
|
||||||
Self(item)
|
|
||||||
}
|
|
||||||
pub fn go_across(self) -> impl Stream<Item = ((String, String), Arc<Value>)> {
|
|
||||||
stream! {
|
|
||||||
for (key, value) in self.0 {
|
|
||||||
info!("Conference {} ({}) is processing ...", key.0, key.1);
|
|
||||||
yield (key, Arc::new(value));
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// to handle async http requests
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct Requester {
|
|
||||||
base: String,
|
|
||||||
api_key: String,
|
|
||||||
client: Client,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Requester {
|
|
||||||
pub fn new() -> anyhow::Result<Self> {
|
|
||||||
Ok(Self {
|
|
||||||
base: VINTEO_BASE.clone().to_owned(),
|
|
||||||
api_key: std::env::var("VINTEO_API_KEY")?,
|
|
||||||
client: Client::builder().user_agent("api-grub").build()?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
pub async fn get_conferences(&self) -> anyhow::Result<Value> {
|
|
||||||
trace!("getting conferences list from API");
|
|
||||||
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
|
|
||||||
let req = self.client.get(url)
|
|
||||||
.timeout(tokio::time::Duration::from_secs(10))
|
|
||||||
.header("accept", "application/json")
|
|
||||||
.header("x-api-key", &self.api_key);
|
|
||||||
let resp = req.send().await;
|
|
||||||
Ok(from_str(&resp?.text().await?)?)
|
|
||||||
}
|
|
||||||
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
|
|
||||||
trace!("extracting conferences list");
|
|
||||||
let mut hashset = Conferences::new();
|
|
||||||
match conferences.get("data") {
|
|
||||||
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
|
|
||||||
Some(data) => {
|
|
||||||
match data.get("conferences") {
|
|
||||||
None => return Err(anyhow::Error::msg("Invalid JSON format: no `conferences` field")),
|
|
||||||
Some(confs) => {
|
|
||||||
if let Some(confs) = confs.as_array() {
|
|
||||||
confs.iter()
|
|
||||||
.for_each(|conf_obj| {
|
|
||||||
if let Some(id) = conf_obj.get("number") {
|
|
||||||
if let Some(id) = id.as_str() {
|
|
||||||
let id = {
|
|
||||||
if let Some(desc) = conf_obj.get("description") {
|
|
||||||
match desc.as_str() {
|
|
||||||
Some(desc) => (desc.to_string(), id.to_string()),
|
|
||||||
None => (String::new(), id.to_string())
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
(String::new(), id.to_string())
|
|
||||||
}
|
|
||||||
};
|
|
||||||
hashset.insert(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
return Err(anyhow::Error::msg("Invalid JSON format: `conferences` is not an array"))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
Ok(hashset)
|
|
||||||
}
|
|
||||||
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<OutputConferences> {
|
|
||||||
trace!("getting users list for each conference from API ...");
|
|
||||||
let mut output = OutputConferences::new();
|
|
||||||
for conf in conferences {
|
|
||||||
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
|
|
||||||
let req = self.client.get(url)
|
|
||||||
.timeout(tokio::time::Duration::from_secs(10))
|
|
||||||
.header("accept", "application/json")
|
|
||||||
.header("x-api-key", &self.api_key);
|
|
||||||
|
|
||||||
match req.send().await {
|
|
||||||
Err(er) => error!("Cannot GET data about conference - {:?}: {}", conf, er),
|
|
||||||
Ok(resp) => {
|
|
||||||
match resp.text().await {
|
|
||||||
Err(er) => error!("Cannot convert response into text: {}", er),
|
|
||||||
Ok(resp) => {
|
|
||||||
let resp: Value = from_str(&resp)?;
|
|
||||||
output.entry(conf).or_insert(resp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(output)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
fn across_participants(conf: &Value) -> impl Stream<Item = Arc<Value>> + '_ {
|
|
||||||
trace!("fetching participants for conf and going across ...");
|
|
||||||
let participants_iter = conf
|
|
||||||
.get("data")
|
|
||||||
.and_then(|data| data.get("participants"))
|
|
||||||
.and_then(|participants| participants.as_array())
|
|
||||||
.into_iter()
|
|
||||||
.flatten()
|
|
||||||
.cloned();
|
|
||||||
|
|
||||||
stream! {
|
|
||||||
for participant in participants_iter {
|
|
||||||
yield Arc::new(participant);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<(&str, &str)>) -> Vec<Option<MetricOutputExtended>> {
|
|
||||||
trace!("extracting {} measures for current participant in current conference {} ...", targets.len(), conf_id);
|
|
||||||
let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new();
|
|
||||||
for (target, description) in targets {
|
|
||||||
futures.push(Box::pin(extract_from_participant(
|
|
||||||
parts.clone(),
|
|
||||||
target,
|
|
||||||
conf_id.clone(),
|
|
||||||
conf_desc.clone(),
|
|
||||||
description
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
future::join_all(futures).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// GET participants/{conference_id} data.total
|
|
||||||
async fn extract_total_participants(conferences : Arc<Value>) -> Value {
|
|
||||||
if let Some(total) = conferences
|
|
||||||
.get("data")
|
|
||||||
.and_then(|data| data.get("total")) {
|
|
||||||
return total.clone()
|
|
||||||
}
|
|
||||||
Value::Null
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn extract_total_anon_participants(conferences : Arc<Value>) -> Value {
|
|
||||||
let sum = conferences
|
|
||||||
.get("data")
|
|
||||||
.and_then(|data| data.get("participants"))
|
|
||||||
.and_then(|parts| parts.as_array())
|
|
||||||
.iter()
|
|
||||||
.flat_map(|&val| val.iter())
|
|
||||||
.filter_map(|part| part.get("isAnonymous"))
|
|
||||||
.filter_map(|is_anon| is_anon.as_bool())
|
|
||||||
.filter(|is_anon| *is_anon == true)
|
|
||||||
.count();
|
|
||||||
Value::Number(sum.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
// GET participants/{conference_id} data.participants[].isAnonymous
|
|
||||||
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>, description: &str) -> Option<MetricOutputExtended> {
|
|
||||||
trace!("extracting `{}` measure for current participant ...", target);
|
|
||||||
if let Some(value) = participant.get("params")
|
|
||||||
.and_then(|params| params.get(target)) {
|
|
||||||
let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
|
|
||||||
let id = participant.get("id").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
|
|
||||||
let metric_name = format!("{}_{}_{}", target, conf_id, id);
|
|
||||||
return Some(
|
|
||||||
MetricOutputExtended::new_with_slices(
|
|
||||||
&metric_name,
|
|
||||||
&metric_name,
|
|
||||||
"type",
|
|
||||||
"Vinteo native",
|
|
||||||
format!(
|
|
||||||
"{} для пользователя {} ({}) в конференции {} ({})",
|
|
||||||
description,
|
|
||||||
name,
|
|
||||||
id,
|
|
||||||
conf_desc,
|
|
||||||
conf_id
|
|
||||||
).as_ref(),
|
|
||||||
Some(18),
|
|
||||||
Some("module$12".to_string()),
|
|
||||||
value.clone()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
|
|
||||||
// create requester
|
|
||||||
let requester = Requester::new()?;
|
|
||||||
// get conference id's
|
|
||||||
loop {
|
|
||||||
info!("Getting conferences array ...");
|
|
||||||
let conferences = requester.get_conferences().await?;
|
|
||||||
let conferences = Requester::get_conferences_from_value(conferences).await?;
|
|
||||||
|
|
||||||
info!("Getting jitter metrics by user in each conference ...");
|
|
||||||
let confs = requester.get_users_jitter_by_conference(conferences).await?;
|
|
||||||
|
|
||||||
info!("Initializing extraction mechanism ...");
|
|
||||||
let conferences = ConferencesDigital::new(confs);
|
|
||||||
let extracted_conference = conferences.go_across();
|
|
||||||
let mut buffer: Vec<MetricOutputExtended> = Vec::new();
|
|
||||||
pin_mut!(extracted_conference);
|
|
||||||
|
|
||||||
while let Some(((desc, id), item)) = extracted_conference.next().await {
|
|
||||||
let parts_stream = across_participants(&item);
|
|
||||||
pin_mut!(parts_stream);
|
|
||||||
|
|
||||||
let total_participants = MetricOutputExtended::new_with_slices(
|
|
||||||
format!("TotalParticipants_{}", id).as_ref(),
|
|
||||||
format!("TotalParticipants_{}", id).as_ref(),
|
|
||||||
"type",
|
|
||||||
"Vinteo Native",
|
|
||||||
format!("Общее количество участников в конференции {}", &desc).as_ref(),
|
|
||||||
Some(18),
|
|
||||||
Some(String::from("module$12")),
|
|
||||||
extract_total_participants(item.clone()).await
|
|
||||||
);
|
|
||||||
// anon NON_STABLE
|
|
||||||
let total_anon_participants = MetricOutputExtended::new_with_slices(
|
|
||||||
format!("TotalAnonymousParticipants_{}", id).as_ref(),
|
|
||||||
format!("TotalAnonymousParticipants_{}", id).as_ref(),
|
|
||||||
"type",
|
|
||||||
"Vinteo Native",
|
|
||||||
format!("Общее количество анонимных участников в конференции {}", &desc).as_ref(),
|
|
||||||
Some(18),
|
|
||||||
Some(String::from("module$12")),
|
|
||||||
extract_total_anon_participants(item.clone()).await
|
|
||||||
);
|
|
||||||
// description
|
|
||||||
|
|
||||||
buffer.push(total_participants);
|
|
||||||
buffer.push(total_anon_participants);
|
|
||||||
|
|
||||||
while let Some(participant) = parts_stream.next().await {
|
|
||||||
buffer.append(
|
|
||||||
&mut gather_futures(
|
|
||||||
participant,
|
|
||||||
Arc::from(id.as_ref()),
|
|
||||||
Arc::from(desc.as_ref()),
|
|
||||||
vec![
|
|
||||||
("txBitrate", "Скорость отправки данных"),
|
|
||||||
("rxBitrate", "Скорость приёма данных"),
|
|
||||||
("txFPS", "Количество отправляемых кадров в секунду"),
|
|
||||||
("rxFPS", "Количество получаемых кадров в секунду"),
|
|
||||||
("rxLost", "Количество потерянных пакетов на приёме"),
|
|
||||||
("txLost", "Количество потерянных пакетов на отправке"),
|
|
||||||
("jBLen", "Фазовое отклонение на цифровом канале"),
|
|
||||||
("txLostPercent", "Процент потерянных пакетов на отправке"),
|
|
||||||
("rxLostPercent", "Процент потерянных пакетов на приёме"),
|
|
||||||
("txH239Bitrate", "Скорость передачи данных в дополнительном видео-потоке"),
|
|
||||||
("rxH239Bitrate", "Скорость приёма данных в дополнительном видео-потоке"),
|
|
||||||
("txVideoCodec", "Используемый видеокодек на отправке"),
|
|
||||||
("rxVideoCodec", "Используемый видеокодек на приёме"),
|
|
||||||
("txAudioCodec", "Используемый аудиокодек на отправке"),
|
|
||||||
("rxAudioCodec", "Используемый аудиокодек на приёме"),
|
|
||||||
("txResolution", "Разрешение передаваемого видео"),
|
|
||||||
("rxResolution", "Разрешение принимаемого видео"),
|
|
||||||
("txH239Resolution", "Разрешение дополнительного видео-потока"),
|
|
||||||
("rxH239Resolution", "Разрешение дополнительного видео-потока на принимающей стороне"),
|
|
||||||
("duration", "Длительсность сеанса (в минутах)"),
|
|
||||||
],
|
|
||||||
).await
|
|
||||||
.into_iter()
|
|
||||||
.filter(|metric| metric.is_some())
|
|
||||||
.map(|metric| metric.unwrap() )
|
|
||||||
.collect::<Vec<MetricOutputExtended>>()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// let metrics = Requester::get_metrics_from_jitter(jitter).await;
|
|
||||||
|
|
||||||
if !buffer.is_empty() {
|
|
||||||
let metrics = PrometheusMetricsExtended::new_zvks(buffer).await;
|
|
||||||
match Exporter::export_extended_metrics(metrics).await {
|
|
||||||
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
|
|
||||||
Err(er) => error!("Cannot export data: {}", er),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
warn!("Metrics array is empty. Ignoring exporting ...");
|
|
||||||
}
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
|
||||||
}
|
|
||||||
// get users' jitter info
|
|
||||||
}
|
|
||||||
|
|
@ -1,4 +1,6 @@
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use chrono::Local;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
|
|
@ -19,6 +21,20 @@ use tracing::info;
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
pub async fn setup_logger() -> Result<()> {
|
pub async fn setup_logger() -> Result<()> {
|
||||||
|
// Builder::new()
|
||||||
|
// .format(move |buf, record| {
|
||||||
|
// writeln!(
|
||||||
|
// buf,
|
||||||
|
// "|{}| {} [{}] - {}",
|
||||||
|
// "api-grubber",
|
||||||
|
// Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||||
|
// record.level(),
|
||||||
|
// record.args(),
|
||||||
|
// )
|
||||||
|
// })
|
||||||
|
// .filter(None, LevelFilter::Info)
|
||||||
|
// .target(env_logger::Target::Stdout)
|
||||||
|
// .init();
|
||||||
let log_level = std::env::var("IM_LOG_INFO").unwrap_or_else(|_| String::from("INFO"));
|
let log_level = std::env::var("IM_LOG_INFO").unwrap_or_else(|_| String::from("INFO"));
|
||||||
|
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ mod logger;
|
||||||
mod json;
|
mod json;
|
||||||
mod export;
|
mod export;
|
||||||
mod monitoring;
|
mod monitoring;
|
||||||
mod jitter;
|
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use integr_structs::api::v3::Config;
|
use integr_structs::api::v3::Config;
|
||||||
|
|
@ -13,10 +12,7 @@ use config::{pull_local_config, init_config_grub_mechanism};
|
||||||
use net::init_api_grub_mechanism;
|
use net::init_api_grub_mechanism;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
// ENODE_MONITORING_IP
|
|
||||||
use monitoring::get_metrics_from_monitoring;
|
use monitoring::get_metrics_from_monitoring;
|
||||||
// VINTEO_API_KEY
|
|
||||||
use jitter::init_grubbing_jitter;
|
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
async fn main() -> Result<()>{
|
async fn main() -> Result<()>{
|
||||||
|
|
@ -36,30 +32,24 @@ async fn main() -> Result<()>{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
let request_delay = std::env::var("IM_REQUEST_DELAY")
|
|
||||||
.unwrap_or_else(|_| String::from("5"))
|
|
||||||
.parse::<u32>()
|
|
||||||
.unwrap_or_else(|_| {
|
|
||||||
warn!("No delay was set, setting up as 5 secs ..."); 5
|
|
||||||
});
|
|
||||||
let event_grub = tokio::spawn(async move {
|
let event_grub = tokio::spawn(async move {
|
||||||
// GRAB USING eNODE.MONITORING API GATEWAY
|
|
||||||
if std::env::var("ENODE_MONITORING_IP").is_ok() {
|
if std::env::var("ENODE_MONITORING_IP").is_ok() {
|
||||||
match get_metrics_from_monitoring(0, request_delay as usize).await {
|
match get_metrics_from_monitoring(0, 5).await {
|
||||||
Ok(_) => info!("Grabing (eNODE.Monitoring) task de-initialized"),
|
Ok(_) => {
|
||||||
Err(er) => error!("Grabing task returned an error : {}", er),
|
info!("Grabing (eNODE.Monitoring) task deinitialized");
|
||||||
|
},
|
||||||
|
Err(er) => {
|
||||||
|
error!("Grabing task returned an error : {}", er);
|
||||||
|
},
|
||||||
}
|
}
|
||||||
// JITTER NATIVE GRAB TASK
|
|
||||||
} else if std::env::var("VINTEO_API_KEY").is_ok() {
|
|
||||||
match init_grubbing_jitter().await {
|
|
||||||
Ok(_) => info!("Grabing (Vinteo - Jitter native) task de-initialized"),
|
|
||||||
Err(er) => error!("Jitter grabing mechanism crushed : {}", er),
|
|
||||||
}
|
|
||||||
// NATIVE GRAB TASK USING `config_api.json`
|
|
||||||
} else {
|
} else {
|
||||||
match init_api_grub_mechanism(config, &mut rx).await {
|
match init_api_grub_mechanism(config, &mut rx).await {
|
||||||
Ok(_) => info!("Grabing task de-initialized"),
|
Ok(_) => {
|
||||||
Err(er) => error!("Grabing task returned an error : {}", er),
|
info!("Grabing task deinitialized");
|
||||||
|
},
|
||||||
|
Err(er) => {
|
||||||
|
error!("Grabing task returned an error : {}", er);
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ use serde_json::{Map, Value};
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, get_chunk_size};
|
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size};
|
||||||
use integr_structs::api::enode_monitoring::cmdb::Query;
|
use integr_structs::api::enode_monitoring::cmdb::Query;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
@ -13,110 +13,7 @@ use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
trait AsDeviceRequest {
|
// const IM_CONNECTION_TIMEOUT: String = std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string());
|
||||||
fn as_devices(self) -> Vec<String>;
|
|
||||||
}
|
|
||||||
trait IntoEnodeRequset {
|
|
||||||
fn into_enode_request(self) -> String;
|
|
||||||
}
|
|
||||||
impl AsDeviceRequest for Vec<String> {
|
|
||||||
fn as_devices(mut self) -> Vec<String> {
|
|
||||||
self.iter_mut()
|
|
||||||
.for_each(|dev| *dev = format!("/measures/device${}", dev));
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct MetricInstance {
|
|
||||||
dola_id : String,
|
|
||||||
name : String,
|
|
||||||
desc : String,
|
|
||||||
device : String,
|
|
||||||
source : String,
|
|
||||||
}
|
|
||||||
impl IntoEnodeRequset for &[MetricInstance] {
|
|
||||||
fn into_enode_request(self) -> String {
|
|
||||||
let mut vec: Vec<String> = Vec::new();
|
|
||||||
vec.push("%5B".to_owned());
|
|
||||||
self.iter()
|
|
||||||
.enumerate()
|
|
||||||
.for_each(|(id, val)| {
|
|
||||||
if id > 0 {
|
|
||||||
vec.push(",".to_owned());
|
|
||||||
}
|
|
||||||
vec.push(format!("%22{}%22", val.dola_id));
|
|
||||||
});
|
|
||||||
vec.push("%5D".to_owned());
|
|
||||||
vec.concat()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct MetricMeta {
|
|
||||||
name : String,
|
|
||||||
desc : String,
|
|
||||||
device : String,
|
|
||||||
source : String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for MetricMeta {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
name : String::new(),
|
|
||||||
desc : String::new(),
|
|
||||||
device : String::new(),
|
|
||||||
source : String::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(private_interfaces)]
|
|
||||||
pub trait LazyUnzipInstance {
|
|
||||||
fn lazy_unzip(&self) -> HashMap<String, MetricMeta>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LazyUnzipInstance for &[MetricInstance] {
|
|
||||||
fn lazy_unzip(&self) -> HashMap<String, MetricMeta> {
|
|
||||||
self.iter().map(
|
|
||||||
|obj|
|
|
||||||
(
|
|
||||||
obj.dola_id.to_string(),
|
|
||||||
MetricMeta::new(
|
|
||||||
&obj.name,
|
|
||||||
&obj.desc,
|
|
||||||
&obj.device,
|
|
||||||
&obj.source
|
|
||||||
)
|
|
||||||
)
|
|
||||||
).collect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MetricInstance {
|
|
||||||
fn new(id : &str, name : &str, desc : &str, device : &str, source : &str) -> Self {
|
|
||||||
Self {
|
|
||||||
dola_id : id.to_owned(),
|
|
||||||
name : name.to_owned(),
|
|
||||||
desc : desc.to_owned(),
|
|
||||||
device : device.to_owned(),
|
|
||||||
source : source.to_owned(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MetricMeta {
|
|
||||||
fn new(name : &str, desc : &str, device : &str, source : &str) -> Self {
|
|
||||||
Self {
|
|
||||||
name : name.to_owned(),
|
|
||||||
desc : desc.to_owned(),
|
|
||||||
device : device.to_owned(),
|
|
||||||
source : source.to_owned(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// # Fn `get_metrics_from_monitoring`
|
/// # Fn `get_metrics_from_monitoring`
|
||||||
///
|
///
|
||||||
|
|
@ -164,6 +61,10 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho
|
||||||
warn!("Session dropped, creating new ...");
|
warn!("Session dropped, creating new ...");
|
||||||
break 'inner;
|
break 'inner;
|
||||||
}
|
}
|
||||||
|
// if let Err(_) = a.get_measure_info(vec.clone()).await {
|
||||||
|
// warn!("Session dropped, creating new ...");
|
||||||
|
// break 'inner;
|
||||||
|
// }
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
|
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -194,7 +95,6 @@ pub struct MonitoringImporter {
|
||||||
login : String,
|
login : String,
|
||||||
password : String,
|
password : String,
|
||||||
access_token : String,
|
access_token : String,
|
||||||
api_token : String,
|
|
||||||
ts : String,
|
ts : String,
|
||||||
timeout : usize,
|
timeout : usize,
|
||||||
}
|
}
|
||||||
|
|
@ -217,7 +117,6 @@ impl MonitoringImporter {
|
||||||
login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()),
|
login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()),
|
||||||
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
|
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
|
||||||
access_token : String::new(),
|
access_token : String::new(),
|
||||||
api_token : env::var("ENODE_API_TOKEN").unwrap_or_else(|_| String::new()),
|
|
||||||
ts : String::new(),
|
ts : String::new(),
|
||||||
timeout : std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()).parse().unwrap_or_else(|_| 10)
|
timeout : std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()).parse().unwrap_or_else(|_| 10)
|
||||||
}
|
}
|
||||||
|
|
@ -226,13 +125,7 @@ impl MonitoringImporter {
|
||||||
/// and can be used to pull and push info to and from CM
|
/// and can be used to pull and push info to and from CM
|
||||||
///
|
///
|
||||||
async fn is_valid(&self) -> bool {
|
async fn is_valid(&self) -> bool {
|
||||||
!self.ip.is_empty() && ((!self.login.is_empty() && !self.password.is_empty() ) || !self.api_token.is_empty())
|
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
|
||||||
}
|
|
||||||
/// Function that checks is current `MonitoringImporter` valid
|
|
||||||
/// and can be used to pull and push info to and from CM
|
|
||||||
///
|
|
||||||
async fn is_minimal(&self) -> bool {
|
|
||||||
(self.login.is_empty() || self.password.is_empty()) && !self.api_token.is_empty()
|
|
||||||
}
|
}
|
||||||
/// A setter of `timestamp`
|
/// A setter of `timestamp`
|
||||||
///
|
///
|
||||||
|
|
@ -254,15 +147,8 @@ impl MonitoringImporter {
|
||||||
#[tracing::instrument(name = "cm_fn_session_start", skip_all)]
|
#[tracing::instrument(name = "cm_fn_session_start", skip_all)]
|
||||||
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
||||||
if !self.is_valid().await {
|
if !self.is_valid().await {
|
||||||
if self.is_minimal().await {
|
|
||||||
return Err(Error::msg(format!("Given API-Token is no more actual now ({})", &self.access_token)));
|
|
||||||
}
|
|
||||||
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
||||||
}
|
}
|
||||||
if !self.api_token.is_empty() {
|
|
||||||
std::mem::swap(&mut self.access_token, &mut self.api_token);
|
|
||||||
info!("API-Token that was in the ENODE configuration was set as access-token");
|
|
||||||
} else {
|
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
||||||
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
||||||
|
|
@ -274,7 +160,10 @@ impl MonitoringImporter {
|
||||||
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
|
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
|
||||||
.header("Content-Type", "application/json")
|
.header("Content-Type", "application/json")
|
||||||
.json(&fortoken);
|
.json(&fortoken);
|
||||||
|
// let resp = client.send().await?;
|
||||||
if let Ok(resp) = client.send().await {
|
if let Ok(resp) = client.send().await {
|
||||||
|
// let auth = resp.json::<AuthResponse>().await?;
|
||||||
|
|
||||||
match resp.json::<AuthResponse>().await {
|
match resp.json::<AuthResponse>().await {
|
||||||
Ok(auth) => {
|
Ok(auth) => {
|
||||||
self.set_ts(&fortoken.ts).await;
|
self.set_ts(&fortoken.ts).await;
|
||||||
|
|
@ -290,7 +179,6 @@ impl MonitoringImporter {
|
||||||
delay = delay * 2;
|
delay = delay * 2;
|
||||||
}
|
}
|
||||||
info!("Started a new CM session");
|
info!("Started a new CM session");
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -301,63 +189,32 @@ impl MonitoringImporter {
|
||||||
/// , where `(String, String)` is a tuple of measure `id` and `description`
|
/// , where `(String, String)` is a tuple of measure `id` and `description`
|
||||||
/// (`name`)
|
/// (`name`)
|
||||||
#[tracing::instrument(name = "CM get metrics list mechanism", skip_all)]
|
#[tracing::instrument(name = "CM get metrics list mechanism", skip_all)]
|
||||||
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<MetricInstance>> {
|
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
|
||||||
tracing::trace!("Trying ti get measures list from CM ...");
|
tracing::trace!("Trying ti get measures list from CM ...");
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let mut vec: Vec<MetricInstance> = Vec::new();
|
let mut vec: Vec<(String, String)> = Vec::new();
|
||||||
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
||||||
let id_list = {
|
|
||||||
match std::env::var("ENODE_TARGET_DEVICES") {
|
|
||||||
Err(_) => vec![String::from("18"), String::from("19")],
|
|
||||||
Ok(var) => var.split(',').into_iter().map(|st| st.trim().to_string()).collect::<Vec<String>>(),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let list_of_devices = id_list.clone().as_devices();
|
|
||||||
let client = client
|
let client = client
|
||||||
.post(url)
|
.post(url)
|
||||||
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
|
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
|
||||||
.header("Content-Type", "application/json")
|
.header("Content-Type", "application/json")
|
||||||
.bearer_auth(&self.access_token)
|
.header("access-token", &self.access_token)
|
||||||
.json(&Query::device_oriented(list_of_devices));
|
.json(&Query::default());
|
||||||
|
|
||||||
let resp = client.send().await?.text().await?;
|
let resp = client.send().await?.text().await?;
|
||||||
let resp: Value = serde_json::from_str(&resp)?;
|
let resp: Value = serde_json::from_str(&resp)?;
|
||||||
|
|
||||||
if let Some(arr) = resp.as_array() {
|
if let Some(arr) = resp.as_array() {
|
||||||
for device in arr {
|
for measure in arr {
|
||||||
let device_id = {
|
let id = measure.get("id");
|
||||||
match device.get("name") {
|
let cls = measure.get("cls");
|
||||||
Some(name) => {
|
let name = measure.get("name");
|
||||||
match serde_json::to_string(name) {
|
if id.is_some() && cls.is_some() {
|
||||||
Ok(name) => {
|
let id = id.unwrap().as_i64().unwrap_or_default();
|
||||||
name.split('$').last().unwrap_or_else(|| "undefined-device").to_owned()
|
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
||||||
},
|
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
|
||||||
Err(_) => "undefined-device".to_string(),
|
if cls.is_empty() {
|
||||||
}
|
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
|
||||||
},
|
|
||||||
None => "undefined-device".to_string(),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let device_id = device_id.trim_end_matches('"');
|
|
||||||
if let Some(links) = device.get("links") {
|
|
||||||
if let Some(measures) = links.as_array() {
|
|
||||||
for measure in measures.iter() {
|
|
||||||
let dola_id = measure.get("id");
|
|
||||||
let id = measure.get("measure_id");
|
|
||||||
let source = measure.get("source_id");
|
|
||||||
let desc = measure.get("name");
|
|
||||||
if id.is_some() && source.is_some() && dola_id.is_some() {
|
|
||||||
let dola_id = format!("measure${}", dola_id.unwrap().as_i64().unwrap_or_else(|| 0));
|
|
||||||
let id = id.unwrap().as_str().unwrap_or_else(|| "no-name");
|
|
||||||
let source = source.unwrap().as_str().unwrap_or_else(|| "no-source");
|
|
||||||
let desc = desc.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "no description");
|
|
||||||
if source.is_empty() {
|
|
||||||
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`measure_id` or `source_id`)"));
|
|
||||||
}
|
|
||||||
vec.push(MetricInstance::new(&dola_id, id, desc, device_id.as_ref(), source));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
vec.push((format!("{}${}", cls, id), name.to_string()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -383,8 +240,8 @@ impl MonitoringImporter {
|
||||||
/// exprots all data by itselfs
|
/// exprots all data by itselfs
|
||||||
///
|
///
|
||||||
#[tracing::instrument(name = "CM get measures info mechanism", skip_all)]
|
#[tracing::instrument(name = "CM get measures info mechanism", skip_all)]
|
||||||
pub async fn get_measure_info(&self, measures: Arc<Vec<MetricInstance>>) -> anyhow::Result<()> {
|
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
|
||||||
tracing::trace!("Trying to get info about each measure ...");
|
tracing::trace!("Trying ti get info about each measure ...");
|
||||||
let mut sys = sysinfo::System::new();
|
let mut sys = sysinfo::System::new();
|
||||||
sys.refresh_cpu_all();
|
sys.refresh_cpu_all();
|
||||||
// adaptive permition on task spawm to prevent system overload
|
// adaptive permition on task spawm to prevent system overload
|
||||||
|
|
@ -401,7 +258,7 @@ impl MonitoringImporter {
|
||||||
let arc = arc.clone();
|
let arc = arc.clone();
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
let hm = measure.lazy_unzip();
|
let hm = measure.lazy_unzip();
|
||||||
let measure = Arc::new(measure.into_enode_request());
|
let measure = Arc::new(measure.display());
|
||||||
let _permit = permit.acquire().await.unwrap();
|
let _permit = permit.acquire().await.unwrap();
|
||||||
|
|
||||||
let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
|
let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
|
||||||
|
|
@ -447,14 +304,14 @@ impl MonitoringImporter {
|
||||||
measure: Arc<String>,
|
measure: Arc<String>,
|
||||||
client: Arc<Client>,
|
client: Arc<Client>,
|
||||||
arc: Arc<Self>,
|
arc: Arc<Self>,
|
||||||
hm: &HashMap<String, MetricMeta>,
|
hm: &HashMap<String, String>,
|
||||||
) -> anyhow::Result<PrometheusMetricsExtended> {
|
) -> anyhow::Result<PrometheusMetricsExtended> {
|
||||||
tracing::trace!("Processing CM endpoint with one or more measure names");
|
tracing::trace!("Processing CM endpoint with one or more measure names");
|
||||||
let resp = client
|
let resp = client
|
||||||
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
||||||
.timeout(tokio::time::Duration::from_secs(arc.timeout as u64))
|
.timeout(tokio::time::Duration::from_secs(arc.timeout as u64))
|
||||||
.header("Content-Type", "application/json")
|
.header("Content-Type", "application/json")
|
||||||
.bearer_auth(&arc.access_token)
|
.header("access-token", &arc.access_token)
|
||||||
.send().await?
|
.send().await?
|
||||||
.text().await?;
|
.text().await?;
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
|
|
@ -479,7 +336,7 @@ impl MonitoringImporter {
|
||||||
///
|
///
|
||||||
/// 3) if `Value` is `_` -> returns error **Invalid JSON format**
|
/// 3) if `Value` is `_` -> returns error **Invalid JSON format**
|
||||||
///
|
///
|
||||||
fn extract_metric_data(json: Value, hm: &HashMap<String, MetricMeta>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
|
fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
return match json {
|
return match json {
|
||||||
Value::Object(obj) => {
|
Value::Object(obj) => {
|
||||||
|
|
@ -504,48 +361,53 @@ impl MonitoringImporter {
|
||||||
///
|
///
|
||||||
/// Searches for certain fields and aggregates it in the `MetricOutputExtended`
|
/// Searches for certain fields and aggregates it in the `MetricOutputExtended`
|
||||||
/// object
|
/// object
|
||||||
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, MetricMeta>) -> anyhow::Result<MetricOutputExtended> {
|
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
|
||||||
tracing::trace!("Processing atomic Object value in CM JSON-response");
|
tracing::trace!("Processing atomic Object value in CM JSON-response");
|
||||||
let id = obj.get("$id");
|
let id = obj.get("$id");
|
||||||
let val = obj.get("value");
|
let val = obj.get("value");
|
||||||
|
let description = {
|
||||||
|
let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null);
|
||||||
|
let zero = String::new();
|
||||||
|
if dola_ip.is_null() {
|
||||||
|
zero
|
||||||
|
} else {
|
||||||
|
hm.get(
|
||||||
|
dola_ip.as_str().unwrap_or_else(|| "")
|
||||||
|
)
|
||||||
|
.unwrap_or_else(|| &zero)
|
||||||
|
.to_owned()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if id.is_none() || val.is_none() {
|
if id.is_none() || val.is_none() {
|
||||||
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
|
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
|
||||||
}
|
}
|
||||||
|
let id = id.unwrap().as_str().unwrap_or_else(|| "").replace("$", "_");
|
||||||
let id = id.unwrap().as_str().unwrap_or_else(|| "");
|
|
||||||
let default_meta = MetricMeta::default();
|
|
||||||
let meta = hm.get(id).unwrap_or_else(|| &default_meta);
|
|
||||||
let id = id.replace("$", "_");
|
|
||||||
let val = val.unwrap();
|
let val = val.unwrap();
|
||||||
let device = meta.device.parse::<usize>().unwrap_or_else(|_| 0);
|
|
||||||
|
|
||||||
if id.is_empty() {
|
if id.is_empty() {
|
||||||
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
||||||
}
|
}
|
||||||
Ok(MetricOutputExtended::new_with_slices(
|
|
||||||
id.as_ref(),
|
Ok(MetricOutputExtended {
|
||||||
&meta.name,
|
id : id.to_owned(),
|
||||||
{
|
json_type : match val {
|
||||||
match val {
|
|
||||||
Value::Number(val) => {
|
Value::Number(val) => {
|
||||||
if val.is_i64() {
|
if val.is_i64() {
|
||||||
"i64"
|
"i64".to_owned()
|
||||||
} else if val.is_u64() {
|
} else if val.is_u64() {
|
||||||
"u64"
|
"u64".to_owned()
|
||||||
} else {
|
} else {
|
||||||
"f64"
|
"f64".to_owned()
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => "unknown",
|
_ => "unknown".to_owned(),
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"enode.monitoring.api",
|
addr : "enode.monitoring.api".to_owned(),
|
||||||
&meta.desc,
|
desc : description,
|
||||||
Some(device),
|
value : val.clone(),
|
||||||
Some(meta.source.clone()),
|
status: 0,
|
||||||
val.clone(),
|
})
|
||||||
))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -97,8 +97,9 @@ impl<'a> ApiPoll<'a> {
|
||||||
error!("Bad JSON in response. Error: {}", er);
|
error!("Bad JSON in response. Error: {}", er);
|
||||||
},
|
},
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
|
let endpoint_name = &metrics.name;
|
||||||
let preproc = JsonParser::parse(&metrics.measure, &response);
|
let preproc = JsonParser::parse(&metrics.measure, &response);
|
||||||
let preproc = PrometheusMetrics::new(&service_id, preproc);
|
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
|
||||||
match Exporter::export_metrics(preproc).await {
|
match Exporter::export_metrics(preproc).await {
|
||||||
Ok(bytes) => {
|
Ok(bytes) => {
|
||||||
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);
|
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,5 @@ publish = ["kellnr"]
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.95"
|
anyhow = "1.0.95"
|
||||||
chrono = "0.4.40"
|
chrono = "0.4.40"
|
||||||
dotenv = "0.15.0"
|
|
||||||
serde = { version = "1.0.217", features = ["derive"] }
|
serde = { version = "1.0.217", features = ["derive"] }
|
||||||
serde_json = "1.0.135"
|
serde_json = "1.0.135"
|
||||||
|
|
|
||||||
|
|
@ -265,7 +265,6 @@ pub mod v3 {
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub struct MetricOutputExtended {
|
pub struct MetricOutputExtended {
|
||||||
pub id : String,
|
pub id : String,
|
||||||
pub name : String,
|
|
||||||
#[serde(rename = "type")]
|
#[serde(rename = "type")]
|
||||||
pub json_type : String,
|
pub json_type : String,
|
||||||
pub addr : String,
|
pub addr : String,
|
||||||
|
|
@ -273,20 +272,15 @@ pub mod v3 {
|
||||||
#[serde(rename = "description")]
|
#[serde(rename = "description")]
|
||||||
pub desc : String,
|
pub desc : String,
|
||||||
pub status: usize,
|
pub status: usize,
|
||||||
pub device: Option<usize>,
|
|
||||||
pub source: Option<String>,
|
|
||||||
}
|
}
|
||||||
impl MetricOutputExtended {
|
impl MetricOutputExtended {
|
||||||
pub fn new_with_slices(id : &str, name: &str, json_type : &str, addr: &str, desc : &str, device: Option<usize>, source: Option<String>, value : Value) -> Self {
|
pub fn new_with_slices(id : &str, json_type : &str, addr: &str, desc : &str, value : Value) -> Self {
|
||||||
MetricOutputExtended {
|
MetricOutputExtended {
|
||||||
id : id.to_string(),
|
id : id.to_string(),
|
||||||
name : name.to_string(),
|
|
||||||
json_type : json_type.to_string(),
|
json_type : json_type.to_string(),
|
||||||
addr : addr.to_string(),
|
addr : addr.to_string(),
|
||||||
value : value,
|
value : value,
|
||||||
desc : desc.to_string(),
|
desc : desc.to_string(),
|
||||||
device,
|
|
||||||
source,
|
|
||||||
status: 0,
|
status: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -295,21 +289,21 @@ pub mod v3 {
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub struct PrometheusMetrics {
|
pub struct PrometheusMetrics {
|
||||||
pub service_name: String,
|
pub service_name: String,
|
||||||
// pub endpoint_name: String,
|
pub endpoint_name: String,
|
||||||
pub metrics: Vec<MetricOutput>,
|
pub metrics: Vec<MetricOutput>,
|
||||||
}
|
}
|
||||||
impl PrometheusMetrics {
|
impl PrometheusMetrics {
|
||||||
pub fn new(service: &str, metrics: Vec<MetricOutput>) -> Self {
|
pub fn new(service: &str, endpoint: &str, metrics: Vec<MetricOutput>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
service_name: service.to_string(),
|
service_name: service.to_string(),
|
||||||
// endpoint_name: endpoint.to_string(),
|
endpoint_name: endpoint.to_string(),
|
||||||
metrics: metrics
|
metrics: metrics
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub async fn new_zvks(metrics: Vec<MetricOutput>) -> Self {
|
pub async fn new_zvks(metrics: Vec<MetricOutput>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
service_name : "zvks".to_owned(),
|
service_name : "zvks".to_owned(),
|
||||||
// endpoint_name : "apiforsnmp".to_owned(),
|
endpoint_name : "apiforsnmp".to_owned(),
|
||||||
metrics : metrics,
|
metrics : metrics,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -323,21 +317,14 @@ pub mod v3 {
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub struct PrometheusMetricsExtended {
|
pub struct PrometheusMetricsExtended {
|
||||||
pub service_name: String,
|
pub service_name: String,
|
||||||
|
pub endpoint_name: String,
|
||||||
pub metrics: Vec<MetricOutputExtended>,
|
pub metrics: Vec<MetricOutputExtended>,
|
||||||
}
|
}
|
||||||
impl PrometheusMetricsExtended {
|
impl PrometheusMetricsExtended {
|
||||||
pub fn new_empty_jitter() -> Self {
|
|
||||||
Self {
|
|
||||||
service_name : "zvks".to_owned(),
|
|
||||||
metrics : Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn add(&mut self, metric: MetricOutputExtended) {
|
|
||||||
self.metrics.push(metric);
|
|
||||||
}
|
|
||||||
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
|
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
service_name : "zvks".to_owned(),
|
service_name : "zvks".to_owned(),
|
||||||
|
endpoint_name : "apiforsnmp".to_owned(),
|
||||||
metrics : metrics,
|
metrics : metrics,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -380,18 +367,18 @@ pub mod enode_monitoring {
|
||||||
pub struct Query {
|
pub struct Query {
|
||||||
id : Vec<String>,
|
id : Vec<String>,
|
||||||
data : Data,
|
data : Data,
|
||||||
// #[serde(rename = "postQuery")]
|
#[serde(rename = "postQuery")]
|
||||||
// post_query : String,
|
post_query : String,
|
||||||
#[serde(rename = "enableActions")]
|
#[serde(rename = "enableActions")]
|
||||||
enable_actions : bool,
|
enable_actions : bool,
|
||||||
ts : usize
|
ts : usize
|
||||||
}
|
}
|
||||||
impl Query {
|
impl Default for Query {
|
||||||
pub fn device_oriented(devices: Vec<String>) -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
id : devices,
|
id : vec!["/measures/device$18".to_owned()],
|
||||||
data : Data::default(),
|
data : Data::default(),
|
||||||
// post_query : "links".to_owned(),
|
post_query : "links".to_owned(),
|
||||||
enable_actions : false,
|
enable_actions : false,
|
||||||
ts : 1740060679399
|
ts : 1740060679399
|
||||||
}
|
}
|
||||||
|
|
@ -419,7 +406,7 @@ pub mod enode_monitoring {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
flatten : true,
|
flatten : true,
|
||||||
filter : Filter::default(),
|
filter : Filter::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -489,9 +476,14 @@ pub mod enode_monitoring {
|
||||||
|
|
||||||
pub fn get_chunk_size(total_measures: usize) -> usize {
|
pub fn get_chunk_size(total_measures: usize) -> usize {
|
||||||
match total_measures {
|
match total_measures {
|
||||||
0..=432 => total_measures,
|
0..=144 => total_measures,
|
||||||
433..=1008 => total_measures / 2,
|
145..=288 => total_measures / 4,
|
||||||
_ => total_measures / 4,
|
289..=432 => total_measures / 5,
|
||||||
|
433..=576 => total_measures / 6,
|
||||||
|
577..=720 => total_measures / 7,
|
||||||
|
721..=864 => total_measures / 8,
|
||||||
|
865..=1008 => total_measures / 9,
|
||||||
|
_ => total_measures / 10,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue