Compare commits
No commits in common. "master" and "v1.0.0" have entirely different histories.
|
|
@ -1,4 +0,0 @@
|
|||
/target
|
||||
Cargo.lock
|
||||
*.sock
|
||||
.env
|
||||
32
.env.example
32
.env.example
|
|
@ -1,47 +1,15 @@
|
|||
# Template .env for API grabber
|
||||
|
||||
# PostgreSQL connection [DEPRECATED]
|
||||
# -------------------------------
|
||||
DB_HOST = "ip.addr.postgresql.server"
|
||||
DB_USER = "db_user"
|
||||
DB_PASSWORD = "db_user_password"
|
||||
DB_DBNAME = "db_name"1
|
||||
|
||||
# Prometheus-Exporter info
|
||||
# -------------------------------
|
||||
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
|
||||
|
||||
# VINTEO Jitter puller (needed to init Jitter native grab)
|
||||
# -------------------------------
|
||||
VINTEO_URL_BASE = "http(s)://ip.ip.ip.ip:port"
|
||||
VINTEO_ENDPOINT_CONFERENCES = "/api/v1/to/something"
|
||||
VINTEO_ENDPOINT_PARTICIPANTS = "/api/v1/to/something"
|
||||
VINTEO_API_KEY = "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"
|
||||
|
||||
# Status Model API support
|
||||
# > if exists, ignore `EXPORTER_URL` var
|
||||
STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input"
|
||||
|
||||
# eNODE.Monitoring configuration
|
||||
# -------------------------------
|
||||
# eNODE.Monitoring server IP
|
||||
ENODE_MONITORING_IP = "ip.ip.ip.ip"
|
||||
# eNODE.Monitoring credentials
|
||||
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
|
||||
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
|
||||
# List of target devices
|
||||
ENODE_TARGET_DEVICES = "18, 19"
|
||||
# to work with unlimit API-Token
|
||||
ENODE_API_TOKEN = "sssswwwwaaaaffff"
|
||||
|
||||
# OPTIONAL SETTINGS
|
||||
# -------------------------------
|
||||
# IM configuration for max level of logging info
|
||||
# for example DEBUG, INFO, WARN, ERROR, TRACE
|
||||
IM_LOG_INFO = "INFO"
|
||||
# IM configuration for setting up API connetion
|
||||
# timeout (in secs). Default value - 10
|
||||
IM_CONNECTION_TIMEOUT = "10"
|
||||
# IM configuration for delay of requests
|
||||
# delay (in secs). Default value - 5
|
||||
IM_REQUEST_DELAY = "20"
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
[workspace]
|
||||
resolver = "2"
|
||||
members = [
|
||||
"crates/api-grub", "crates/integr-structs",
|
||||
"crates/api-grub", "crates/integr-structs", "crates/preproc",
|
||||
]
|
||||
|
||||
[profile.dev]
|
||||
|
|
|
|||
|
|
@ -5,11 +5,7 @@ RUN apt update && apt install -y musl-tools
|
|||
RUN rustup target add x86_64-unknown-linux-musl
|
||||
|
||||
COPY . .
|
||||
|
||||
ENV CARGO_HTTP_DEBUG=true
|
||||
ENV CARGO_HTTP_TIMEOUT=100
|
||||
|
||||
RUN cargo test --verbose
|
||||
RUN cargo test
|
||||
RUN cargo build --release --target=x86_64-unknown-linux-musl
|
||||
|
||||
FROM alpine:latest
|
||||
|
|
|
|||
|
|
@ -1,24 +1,3 @@
|
|||
def notify(
|
||||
String context,
|
||||
String giteaUser,
|
||||
String giteaPass,
|
||||
String repositoryUrl,
|
||||
String repositoryName,
|
||||
String commitHash,
|
||||
String buildStatus
|
||||
) {
|
||||
def status = buildStatus == 'success' ? 'success' : 'failure'
|
||||
def description = buildStatus == 'success' ? 'Build succeeded' : 'Build failed'
|
||||
|
||||
sh """
|
||||
curl -X POST \
|
||||
-u "${giteaUser}:${giteaPass}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"context":"${context}","state": "${status}", "description": "${description}"}' \
|
||||
${repositoryUrl}deployer3000/${repositoryName}/statuses/${commitHash}
|
||||
"""
|
||||
}
|
||||
|
||||
pipeline {
|
||||
agent any
|
||||
environment {
|
||||
|
|
@ -34,22 +13,7 @@ pipeline {
|
|||
}
|
||||
steps {
|
||||
script {
|
||||
def hasTags = sh(script: "git tag -l | wc -l", returnStdout: true).trim().toInteger() > 0
|
||||
|
||||
def lastVersion = "0.0.0"
|
||||
|
||||
if (hasTags) {
|
||||
lastVersion = sh(script: "git describe --tags --abbrev=0", returnStdout: true).trim()
|
||||
}
|
||||
|
||||
echo "Last version: ${lastVersion}"
|
||||
|
||||
def (major, minor, patch) = lastVersion.tokenize('.')
|
||||
def newVersion = "${major}.${minor}.${patch.toInteger() + 1}"
|
||||
echo "New version: ${newVersion}"
|
||||
|
||||
env.IMAGE_TAG = newVersion
|
||||
env.NEW_VERSION = newVersion
|
||||
env.IMAGE_TAG = sh(script: "git describe --tags --abbrev=0", returnStdout: true).trim()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -94,30 +58,14 @@ pipeline {
|
|||
echo "Attempting to merge PR ${env.CHANGE_ID} into master..."
|
||||
withCredentials([usernamePassword(credentialsId: 'gitea_creds', usernameVariable: 'GITEA_USER', passwordVariable: 'GITEA_PASS')]) {
|
||||
def prId = env.CHANGE_ID
|
||||
|
||||
sh """
|
||||
curl -X POST \
|
||||
-u "${GITEA_USER}:${GITEA_PASS}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"do":"merge"}' \
|
||||
http://git.entcor/api/v1/repos/deployer3000/${env.IMAGE_NAME}/pulls/${prId}/merge
|
||||
http://git.entcor/api/v1/repos/deployer3000/integration-module/pulls/${prId}/merge
|
||||
"""
|
||||
def commitHash = sh(script: "git rev-parse HEAD~1", returnStdout: true).trim() // необходим для корректного отображения статусов
|
||||
echo "PR ${prId} merged successfully into master!"
|
||||
sleep(time: 15, unit: 'SECONDS')
|
||||
sh "git checkout master && git pull origin master"
|
||||
|
||||
sh """
|
||||
curl -v -X POST -u "${GITEA_USER}:${GITEA_PASS}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"tag_name": "${env.NEW_VERSION}", "name": "Release ${env.NEW_VERSION}", "target_commitish": "master"}' \
|
||||
"${env.GITEA_REPOSITORY_URL}deployer3000/${env.IMAGE_NAME}/releases"
|
||||
"""
|
||||
|
||||
echo "New release succeeded!"
|
||||
|
||||
def context = "test-org/${env.IMAGE_NAME}/pipeline/pr-${env.CHANGE_TARGET}"
|
||||
notify(context, GITEA_USER, GITEA_PASS, env.GITEA_REPOSITORY_URL, env.IMAGE_NAME, commitHash, "success")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
180
README.md
180
README.md
|
|
@ -1,181 +1,15 @@
|
|||
# Интеграционный модуль для проекта "Буревестник ВКС"
|
||||
|
||||
## Описание
|
||||
`Интеграционный модуль (ИМ)` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
|
||||
`integr_mod` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
|
||||
- получение данных через API ВКС
|
||||
- поддержку хранения, валидации и актуализации собственных конфигураций
|
||||
- предобработку полученных данных и сохранение в БД
|
||||
- интеграции с `еНОД.Мониторинг`
|
||||
- предобработку полученных данных и ~~отправку `Системе Мониторинга`~~ сохранение в БД
|
||||
|
||||
## Специфика работы
|
||||
## Current Progress
|
||||
|
||||
На даннный момент предусмотрено два режима работы:
|
||||
|
||||
1) **Нативный** - режим работы, производящий прямой опрос сервиса видео-конференц связи `Vinteo` и соотвествующий процесс `ETL`
|
||||
|
||||
2) **Статичный** - режим работы *"посредник"*, когда все метрические данные ВКС `Vinteo` получаются через `REST-Full API` средства `еНОД.Мониторинг`
|
||||
|
||||
3) **Системный** - аналогичный **статичному** режиму, но метрические данные (заведомо обогащенные нулевым статусом) отправялются не напрямую в модуль `Prometehus exporter`, а в `Статусную модель`
|
||||
|
||||
4) **Vinteo** - особый режим работы, предполагающий сбор определенного набора метрик напрямую с ВКС `Vinteo` механизмом многоэтапного `API-запроса`
|
||||
|
||||
> **Примечание**
|
||||
По стандарту `ИМ` работает в **НАТИВНОМ** режиме и ожидает конфигурации в формате `.json`, однако приоритетным считается **СТАТИЧНЫЙ** режим. Подробная информация о настройке в пункте `Руководство`
|
||||
|
||||
## Руководство
|
||||
|
||||
В данном разделе опсиан алгоритм настройки, сборки и запуска программного модуля `ИМ`
|
||||
|
||||
### Преднастройка
|
||||
|
||||
1. Выбор режима работы модуля, который скорректирует принцип настройки:
|
||||
|
||||
| Режим работы | .env | config-api.json | $STATUS_SYSTEM_URL | $EXPORTER_URL |
|
||||
|---|---|---|---|---|
|
||||
| Нативный | ❌ | ✅ | ❌ | ❌ |
|
||||
| Статичный | ✅ | ❌ | ❌❌❌ | ✅ |
|
||||
| Системный | ✅ | ❌ | ✅ | ❌ |
|
||||
, где:
|
||||
|
||||
✅ -- следует настроить (предпринять)
|
||||
|
||||
❌ -- игнорируется системой, не стоит настраивать
|
||||
|
||||
❌❌❌ -- **НЕЛЬЗЯ** настраивать (предпринимать), возможны ошибки в работе
|
||||
|
||||
> Режим работы `Vinteo` *не описан* в таблице **намеренно**
|
||||
|
||||
### Настройка режима работы "Нативный"
|
||||
|
||||
Для настройки данного режима необходимо расположить в **активной** директории конфигурационный `config_api.json` файл:
|
||||
|
||||
``` json
|
||||
{
|
||||
"config": [
|
||||
{
|
||||
"id":"zvks",
|
||||
"login" : "",
|
||||
"pass" : "",
|
||||
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
||||
"period" : "",
|
||||
"timeout" : "5",
|
||||
"metrics" : [
|
||||
{
|
||||
"name": "conferences",
|
||||
"url": "https://demo.vcs.vinteo.dev/api/v1/conferences",
|
||||
"measure": [
|
||||
{ "id":"number", "type": "text", "addr": "data.conferences[].number" },
|
||||
{ "id":"total", "type": "integer", "addr": "data.total" },
|
||||
{ "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" },
|
||||
{ "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" },
|
||||
{ "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" }
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "abonents",
|
||||
"url": "https://demo.vcs.vinteo.dev/api/v1/accounts",
|
||||
"measure": [
|
||||
{ "id":"total", "type": "integer", "addr": "data.total" }
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
> **Примечание**
|
||||
Название конфигурационного файла должно быть как в примере - `config_api.json`
|
||||
|
||||
|
||||
### Настройка режима работы "Статичный"
|
||||
|
||||
Для настройки данного режима необходимо пополнить данными о сервере в `.env` файле по примеру:
|
||||
|
||||
``` toml
|
||||
...
|
||||
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port" # <--- экспорт данных (обязательно)
|
||||
# eNODE.Monitoring configuration
|
||||
ENODE_MONITORING_IP = "ip.ip.ip.ip"
|
||||
# admin user is required
|
||||
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"# ---> получение данных
|
||||
# admin password is required
|
||||
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
|
||||
...
|
||||
```
|
||||
|
||||
### Настройка режима работы "Системный"
|
||||
|
||||
Для настройки данного режима необходимо пополнить данными о сервере в `.env` файле по примеру:
|
||||
|
||||
``` toml
|
||||
...
|
||||
STATUS_SYSTEM_URL = "http(s)://{ip}:{port}/api/input"# <--- экспорт данных
|
||||
# eNODE.Monitoring configuration
|
||||
ENODE_MONITORING_IP = "{ip}.{ip}.{ip}.{ip}"
|
||||
# admin user is required
|
||||
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"# ---> получение данных
|
||||
# admin password is required
|
||||
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
|
||||
...
|
||||
```
|
||||
|
||||
### Настройка режима работы "Vinteo"
|
||||
|
||||
Для работы в данном режиме необходимо установить переменные окружения в соотвествии со списком ниже
|
||||
|
||||
``` toml
|
||||
...
|
||||
VINTEO_URL_BASE = "https://demo.vcs.vinteo.dev"
|
||||
VINTEO_ENDPOINT_CONFERENCES = "/api/v1/conferences"
|
||||
VINTEO_ENDPOINT_PARTICIPANTS = "/api/v1/participants/"
|
||||
VINTEO_API_KEY = "00000000000111111111.aaaaaaaaaaaaaaabbbbbbbbbbbbb"
|
||||
...
|
||||
```
|
||||
|
||||
### Настройка экспорта полученных и обработанных данных
|
||||
|
||||
Настройка *точки выхода* для полученных и обработанных метрик определеяется установленными в переменных окружения параметрами, варианта два:
|
||||
|
||||
1) **Экспорт в статусную модель** в рамках механизма сквозного прохода данных в проекте `Буревестник ВКС`
|
||||
|
||||
``` toml
|
||||
...
|
||||
STATUS_SYSTEM_URL = "{BASE_URL}/{ROUTE}"
|
||||
...
|
||||
```
|
||||
|
||||
2) **Экспорт в экспортер или иной потребитель данных**
|
||||
|
||||
``` toml
|
||||
...
|
||||
EXPORTER_URL = "{BASE_URL}/{ROUTE}"
|
||||
...
|
||||
```
|
||||
> ### **ОЧЕНЬ ВАЖНОЕ ПРИМЕЧАНИЕ**
|
||||
> ---
|
||||
> Одновременное использование `$STATUS_SYSTEM_URL` и `$EXPORTER_URL` **НЕДОПУСТИМО** !! Вариант со ссылкой **на статусную модель** является _по стандарту_ **БОЛЕЕ ПРИОРИТЕТНЫМ**, второй затрется, использовать необходимо только один
|
||||
|
||||
2. Произвести сборку проекта командой :
|
||||
``` bash
|
||||
cargo build --release
|
||||
```
|
||||
|
||||
3. Запустить
|
||||
> **Debug** версия
|
||||
``` bash
|
||||
cargo run --bin api-grub
|
||||
```
|
||||
или
|
||||
> **Release** версия
|
||||
``` bash
|
||||
cargo run --release --bin api-grub
|
||||
```
|
||||
## Текущий прогресс
|
||||
|
||||
| Крейт (подмодуль) | Прогресс |
|
||||
| Crate (submodule) | Progress |
|
||||
|---|---|
|
||||
|`api-grub` | ✅✅✅✅✅✅✅✅✅🛠️ |
|
||||
|`config-delivery` [migrated] | ❌❌❌❌❌❌❌❌❌❌ |
|
||||
|`api-grub` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ |
|
||||
|`config-delivery` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ |
|
||||
|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ |
|
||||
|`preproc` [temp-deprecated] | ❌❌❌❌❌❌❌❌❌❌ | (разработка временно остановлена)
|
||||
|`preproc` [temp-deprecated] | ✅✅✅❌❌❌❌❌❌❌ | (разработка временно остановлена)
|
||||
|
|
|
|||
|
|
@ -1,20 +1,15 @@
|
|||
[package]
|
||||
name = "api-grub"
|
||||
version = "1.0.15"
|
||||
version = "0.3.13"
|
||||
edition = "2021"
|
||||
authors = ["Vladislav Drozdov <maseeeeeeeed@gmail.com>"]
|
||||
description = "API poller for ZVKS project"
|
||||
homepage = "http://git.enode/deployer3000/integration-module/src/branch/master/crates/api-grub"
|
||||
repository = "http://git.enode/deployer3000/integration-module/src/branch/master/crates/api-grub"
|
||||
license = "MIT OR Apache-2.0"
|
||||
keywords = ["api", "grub", "zvks"]
|
||||
publish = ["kellnr"]
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1.0.217", features = ["derive"] }
|
||||
serde_json = "1.0.135"
|
||||
tokio = { version = "1.43.0", features = ["full"] }
|
||||
integr-structs = { version = ">=0.1.0", path="../integr-structs"}
|
||||
integr-structs = {path = "../integr-structs"}
|
||||
env_logger = "0.11.6"
|
||||
log = "0.4.25"
|
||||
anyhow = "1.0.95"
|
||||
chrono = "0.4.39"
|
||||
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
|
||||
|
|
@ -25,8 +20,3 @@ md5 = "0.7.0"
|
|||
rand = "0.9.0"
|
||||
sysinfo = "0.33.1"
|
||||
openssl = { version = "0.10", features = ["vendored"] }
|
||||
tracing-subscriber = "0.3.19"
|
||||
tracing = "0.1.41"
|
||||
lazy_static = "1.5.0"
|
||||
futures = "0.3.31"
|
||||
async-stream = "0.3.6"
|
||||
|
|
|
|||
|
|
@ -1,8 +1,9 @@
|
|||
// mod to communicate with api-grub config file
|
||||
// 1) check changes in unix-socket
|
||||
// 2) save changes in local config file
|
||||
use integr_structs::api::ApiConfigV2;
|
||||
use anyhow::{Error, Ok, Result};
|
||||
use tracing::{info, warn, error};
|
||||
use log::{info, warn, error};
|
||||
use std::{fs, path::Path};
|
||||
use serde_json::from_str;
|
||||
use tokio::{io::AsyncReadExt, net::UnixListener};
|
||||
|
|
@ -14,23 +15,19 @@ use integr_structs::api::v3::Config;
|
|||
const CONFIG_PATH: &str = "config_api.json";
|
||||
const SOCKET_PATH: &str = "api-grub.sock";
|
||||
|
||||
// TODO: rewrite to use current_exe
|
||||
/// # Fn `pull_local_config`
|
||||
///
|
||||
/// ## function to one-time pulling local config by straight reading
|
||||
///
|
||||
/// ### Dev-Info :
|
||||
///
|
||||
/// *input* : -
|
||||
///
|
||||
/// *output* : `anyhow::Result<integr_structs::api::v3::Config>`
|
||||
///
|
||||
/// *initiator* : fn `main`
|
||||
///
|
||||
/// *managing* : -
|
||||
///
|
||||
// todo! rewrite to use current_exe
|
||||
pub async fn pull_local_config() -> Result<Config> {
|
||||
// let conf_path = std::env::current_exe()?;
|
||||
let path = Path::new(CONFIG_PATH);
|
||||
// return match conf_path.parent() {
|
||||
// Some(dir) => {
|
||||
// let config: ApiConfig = from_str(
|
||||
// &fs::read_to_string(dir.join(CONFIG_PATH))?
|
||||
// )?;
|
||||
// Ok(config)
|
||||
// },
|
||||
// None => Err(Error::msg("No local conf was found"))
|
||||
// }
|
||||
if path.exists() && path.is_file() {
|
||||
let config: Config = from_str(
|
||||
&fs::read_to_string(CONFIG_PATH)?
|
||||
|
|
@ -42,29 +39,14 @@ pub async fn pull_local_config() -> Result<Config> {
|
|||
}
|
||||
|
||||
// for config pulling
|
||||
/// # Fn `pull_local_config`
|
||||
///
|
||||
/// ## function to init Unix-Socket listening for grabbing new configs
|
||||
///
|
||||
/// ### Dev-Info :
|
||||
///
|
||||
/// *input* : `&tokio::sync::mpsc::Sender<integr_structs::api::v3::Config>`
|
||||
///
|
||||
/// *output* : `anyhow::Result<()>`
|
||||
///
|
||||
/// *initiator* : fn `main`
|
||||
///
|
||||
/// *managing* : -
|
||||
///
|
||||
/// *depends on* : `const SOCKET_PATH`
|
||||
///
|
||||
// ++++ reader to channel
|
||||
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
|
||||
info!("Initializing Unix-Socket listening for pulling new configs...");
|
||||
let server = init_unix_listener().await?;
|
||||
|
||||
//
|
||||
info!("Listening Unix-Socket...");
|
||||
let mut buffer = String::new();
|
||||
|
||||
//
|
||||
loop {
|
||||
if let stdOk((mut stream, _)) = server.accept().await {
|
||||
if let Err(er) = stream.read_to_string(&mut buffer).await {
|
||||
|
|
@ -87,43 +69,12 @@ pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
|
|||
}
|
||||
|
||||
// saving new config locally
|
||||
/// # Fn `save_new_config`
|
||||
///
|
||||
/// ## function for saving new config locally
|
||||
///
|
||||
/// ### Dev-Info :
|
||||
///
|
||||
/// *input* : `&String | &str`
|
||||
///
|
||||
/// *output* : `anyhow::Result<()>`
|
||||
///
|
||||
/// *initiator* : fn `main`
|
||||
///
|
||||
/// *managing* : -
|
||||
///
|
||||
/// *depends on* : `const CONFIG_PATH`
|
||||
///
|
||||
async fn save_new_config(config: &String) -> Result<()> {
|
||||
fs::write(CONFIG_PATH, config)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// # Fn `pull_local_config`
|
||||
///
|
||||
/// ## function for saving new config locally
|
||||
///
|
||||
/// ### Dev-Info :
|
||||
///
|
||||
/// *input* : `&tokio::sync::mpsc::Sender<Config>`
|
||||
///
|
||||
/// *output* : `anyhow::Result<tokio::net::UnixListener>`
|
||||
///
|
||||
/// *initiator* : fn `init_config_grub_mechanism`
|
||||
///
|
||||
/// *managing* : -
|
||||
///
|
||||
/// *depends on* : `const SOCKET_PATH`
|
||||
///
|
||||
|
||||
async fn init_unix_listener() -> Result<UnixListener> {
|
||||
let _ = fs::remove_file(SOCKET_PATH);
|
||||
Ok(UnixListener::bind(SOCKET_PATH)?)
|
||||
|
|
|
|||
|
|
@ -1,48 +1,16 @@
|
|||
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
|
||||
use integr_structs::api::v3::{PrometheusMetrics, PrometheusMetricsExtended};
|
||||
use integr_structs::api::v3::PrometheusMetrics;
|
||||
use reqwest::Client;
|
||||
use tokio_postgres::NoTls;
|
||||
use std::env;
|
||||
use anyhow::Result;
|
||||
use tracing::{debug, error, info, trace};
|
||||
use std::ops::Drop;
|
||||
use anyhow::{Result};
|
||||
use log::{info, error};
|
||||
|
||||
/// An entity which handles DB connections.
|
||||
///
|
||||
/// This struct is used to be a layer between API-grab workers
|
||||
/// and the `pool` of DB connections to save grabbing processes statuses
|
||||
/// (now `PostgreSQL`, slowly migrating to `ClickHouse` support)
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use api-grub::export::Expoter;
|
||||
///
|
||||
/// let exporter = Expoter::init();
|
||||
///
|
||||
/// assert!(exporter.get_connection_from_pool().is_some());
|
||||
/// ```
|
||||
///
|
||||
/// # Hint:
|
||||
///
|
||||
/// - use `export_data` method to export metrics to DB
|
||||
///
|
||||
/// - use `export_metrics` method to export metrics (`PrometehusMetrics`
|
||||
/// type) to the Prometehus exporter (`$EXPORTER_URL`)
|
||||
///
|
||||
/// - use `export_metrics_extended` method to export metrics (
|
||||
/// `*PrometehusMetricsExtended*` type with `desc` field) to the
|
||||
/// Prometehus exporter (`$EXPORTER_URL`)
|
||||
pub struct Exporter {
|
||||
pool : Option<Pool>,
|
||||
}
|
||||
|
||||
impl Exporter {
|
||||
/// Fills `deadpool_postgres::Config` object with values from ENV VARS:
|
||||
/// - `DB_HOST`
|
||||
/// - `DB_DBNAME`
|
||||
/// - `DB_USER`
|
||||
/// - `DB_PASSWORD`
|
||||
fn config_construct() -> Result<Config> {
|
||||
let mut cfg = Config::new();
|
||||
cfg.host = Some(env::var("DB_HOST")?);
|
||||
|
|
@ -51,9 +19,6 @@ impl Exporter {
|
|||
cfg.password = Some(env::var("DB_PASSWORD")?);
|
||||
Ok(cfg)
|
||||
}
|
||||
/// Uses `deadpool_postgres::Config` object to create DB connections
|
||||
/// pool to share between async tasks and to restrict a count of parallel
|
||||
/// connections
|
||||
fn pool_construct() -> Option<Pool> {
|
||||
return match Self::config_construct() {
|
||||
Ok(config) => {
|
||||
|
|
@ -69,73 +34,38 @@ impl Exporter {
|
|||
},
|
||||
}
|
||||
}
|
||||
/// Checks if DB connections pool is empty
|
||||
#[allow(unused)]
|
||||
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
||||
pub fn init() -> Self {
|
||||
Self {
|
||||
pool : Self::pool_construct(),
|
||||
}
|
||||
}
|
||||
/// Shares a connection `deadpool_postgres::Client as PgClient`
|
||||
///
|
||||
/// Function awaits til the moment it can return `Option<deadpool_postgres::Client as PgClient>`
|
||||
#[allow(unused)]
|
||||
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
||||
if let Some(pool) = &self.pool {
|
||||
return Some(pool.get().await.ok()?);
|
||||
}
|
||||
None
|
||||
}
|
||||
/// Exports data in `&str` jsonb format to DB using connection from the pool
|
||||
#[allow(unused)]
|
||||
#[tracing::instrument(name = "PostgreSQL export")]
|
||||
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||
// client.
|
||||
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
||||
let _ = client.query(&query, &[&metrics]).await?;
|
||||
Ok(())
|
||||
}
|
||||
/// Exports metrics in `PrometheusMetrics` format to Exporter defined
|
||||
/// as env var $EXORPTER_URL
|
||||
#[tracing::instrument(name = "Prometheus export")]
|
||||
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
||||
let url = env::var("EXPORTER_URL")?;
|
||||
|
||||
debug!("Exporting: {:?}", &metrics);
|
||||
|
||||
// let req = Request::new(Method::PUT,
|
||||
// Url::parse(metrics)?);
|
||||
// dbg!(&metrics);
|
||||
let req = Client::new()
|
||||
.post(url)
|
||||
.json(&metrics)
|
||||
.send().await;
|
||||
|
||||
req?;
|
||||
Ok(metrics.get_bytes_len())
|
||||
}
|
||||
/// Exports metrics in `PrometheusMetricsExtended` format to Exporter defined
|
||||
/// as env var $EXORPTER_URL
|
||||
#[tracing::instrument(name = "Prometheus/Status System export")]
|
||||
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
|
||||
// let url = env::var("EXPORTER_URL")?;
|
||||
let url = env::var("STATUS_SYSTEM_URL").or_else(|err| {
|
||||
trace!("cannot fetch $STATUS_SYSTEM_URL var due to {}. working only with Prometheus exporter link", err);
|
||||
env::var("EXPORTER_URL")
|
||||
})?;
|
||||
|
||||
debug!("Exporting: {:?}", &metrics);
|
||||
|
||||
let req = Client::new()
|
||||
.post(&url)
|
||||
.json(&metrics)
|
||||
.send().await;
|
||||
// dbg!(&req);
|
||||
// dbg!(&req.unwrap().text().await);
|
||||
// todo : rewrite with status code wrapping
|
||||
req?;
|
||||
Ok(metrics.get_bytes_len())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl Drop for Exporter {
|
||||
// Custom destructor to log deinitializing of the `Exporter`
|
||||
fn drop(&mut self) {
|
||||
info!("Deinitializng Exporter and DB connection pool ...")
|
||||
}
|
||||
}
|
||||
|
|
@ -1,319 +0,0 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended};
|
||||
use std::sync::Arc;
|
||||
use reqwest::Client;
|
||||
use lazy_static::lazy_static;
|
||||
use tracing::{error, info, trace, warn};
|
||||
use std::pin::Pin;
|
||||
use serde_json::{Value, from_str};
|
||||
use crate::export::Exporter;
|
||||
use async_stream::stream;
|
||||
use futures::{future, pin_mut, stream::Stream, StreamExt};
|
||||
|
||||
lazy_static! {
|
||||
static ref VINTEO_BASE: String = std::env::var("VINTEO_URL_BASE").unwrap_or_else(|_| String::from("https://demo.vcs.vinteo.dev"));
|
||||
static ref CONFERENCES_ENDPOINT: String = std::env::var("VINTEO_ENDPOINT_CONFERENCES").unwrap_or_else(|_| String::from("/api/v1/conferences"));
|
||||
static ref USERS_ENDPOINT: String = std::env::var("VINTEO_ENDPOINT_PARTICIPANTS").unwrap_or_else(|_| String::from("/api/v1/participants/"));
|
||||
}
|
||||
|
||||
// conferences ids
|
||||
type Conferences = HashSet<(String, String)>;
|
||||
type OutputConferences = HashMap<(String, String), Value>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct ConferencesDigital(OutputConferences);
|
||||
|
||||
impl ConferencesDigital {
|
||||
pub fn new(item: OutputConferences) -> Self {
|
||||
Self(item)
|
||||
}
|
||||
pub fn go_across(self) -> impl Stream<Item = ((String, String), Arc<Value>)> {
|
||||
stream! {
|
||||
for (key, value) in self.0 {
|
||||
info!("Conference {} ({}) is processing ...", key.0, key.1);
|
||||
yield (key, Arc::new(value));
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// to handle async http requests
|
||||
#[derive(Debug)]
|
||||
struct Requester {
|
||||
base: String,
|
||||
api_key: String,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl Requester {
|
||||
pub fn new() -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
base: VINTEO_BASE.clone().to_owned(),
|
||||
api_key: std::env::var("VINTEO_API_KEY")?,
|
||||
client: Client::builder().user_agent("api-grub").build()?,
|
||||
})
|
||||
}
|
||||
pub async fn get_conferences(&self) -> anyhow::Result<Value> {
|
||||
trace!("getting conferences list from API");
|
||||
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
|
||||
let req = self.client.get(url)
|
||||
.timeout(tokio::time::Duration::from_secs(10))
|
||||
.header("accept", "application/json")
|
||||
.header("x-api-key", &self.api_key);
|
||||
let resp = req.send().await;
|
||||
Ok(from_str(&resp?.text().await?)?)
|
||||
}
|
||||
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
|
||||
trace!("extracting conferences list");
|
||||
let mut hashset = Conferences::new();
|
||||
match conferences.get("data") {
|
||||
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
|
||||
Some(data) => {
|
||||
match data.get("conferences") {
|
||||
None => return Err(anyhow::Error::msg("Invalid JSON format: no `conferences` field")),
|
||||
Some(confs) => {
|
||||
if let Some(confs) = confs.as_array() {
|
||||
confs.iter()
|
||||
.for_each(|conf_obj| {
|
||||
if let Some(id) = conf_obj.get("number") {
|
||||
if let Some(id) = id.as_str() {
|
||||
let id = {
|
||||
if let Some(desc) = conf_obj.get("description") {
|
||||
match desc.as_str() {
|
||||
Some(desc) => (desc.to_string(), id.to_string()),
|
||||
None => (String::new(), id.to_string())
|
||||
}
|
||||
} else {
|
||||
(String::new(), id.to_string())
|
||||
}
|
||||
};
|
||||
hashset.insert(id);
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
return Err(anyhow::Error::msg("Invalid JSON format: `conferences` is not an array"))
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
Ok(hashset)
|
||||
}
|
||||
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<OutputConferences> {
|
||||
trace!("getting users list for each conference from API ...");
|
||||
let mut output = OutputConferences::new();
|
||||
for conf in conferences {
|
||||
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
|
||||
let req = self.client.get(url)
|
||||
.timeout(tokio::time::Duration::from_secs(10))
|
||||
.header("accept", "application/json")
|
||||
.header("x-api-key", &self.api_key);
|
||||
|
||||
match req.send().await {
|
||||
Err(er) => error!("Cannot GET data about conference - {:?}: {}", conf, er),
|
||||
Ok(resp) => {
|
||||
match resp.text().await {
|
||||
Err(er) => error!("Cannot convert response into text: {}", er),
|
||||
Ok(resp) => {
|
||||
let resp: Value = from_str(&resp)?;
|
||||
output.entry(conf).or_insert(resp);
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
Ok(output)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn across_participants(conf: &Value) -> impl Stream<Item = Arc<Value>> + '_ {
|
||||
trace!("fetching participants for conf and going across ...");
|
||||
let participants_iter = conf
|
||||
.get("data")
|
||||
.and_then(|data| data.get("participants"))
|
||||
.and_then(|participants| participants.as_array())
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.cloned();
|
||||
|
||||
stream! {
|
||||
for participant in participants_iter {
|
||||
yield Arc::new(participant);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<(&str, &str)>) -> Vec<Option<MetricOutputExtended>> {
|
||||
trace!("extracting {} measures for current participant in current conference {} ...", targets.len(), conf_id);
|
||||
let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new();
|
||||
for (target, description) in targets {
|
||||
futures.push(Box::pin(extract_from_participant(
|
||||
parts.clone(),
|
||||
target,
|
||||
conf_id.clone(),
|
||||
conf_desc.clone(),
|
||||
description
|
||||
)));
|
||||
}
|
||||
future::join_all(futures).await
|
||||
}
|
||||
|
||||
// GET participants/{conference_id} data.total
|
||||
async fn extract_total_participants(conferences : Arc<Value>) -> Value {
|
||||
if let Some(total) = conferences
|
||||
.get("data")
|
||||
.and_then(|data| data.get("total")) {
|
||||
return total.clone()
|
||||
}
|
||||
Value::Null
|
||||
}
|
||||
|
||||
async fn extract_total_anon_participants(conferences : Arc<Value>) -> Value {
|
||||
let sum = conferences
|
||||
.get("data")
|
||||
.and_then(|data| data.get("participants"))
|
||||
.and_then(|parts| parts.as_array())
|
||||
.iter()
|
||||
.flat_map(|&val| val.iter())
|
||||
.filter_map(|part| part.get("isAnonymous"))
|
||||
.filter_map(|is_anon| is_anon.as_bool())
|
||||
.filter(|is_anon| *is_anon == true)
|
||||
.count();
|
||||
Value::Number(sum.into())
|
||||
}
|
||||
|
||||
// GET participants/{conference_id} data.participants[].isAnonymous
|
||||
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>, description: &str) -> Option<MetricOutputExtended> {
|
||||
trace!("extracting `{}` measure for current participant ...", target);
|
||||
if let Some(value) = participant.get("params")
|
||||
.and_then(|params| params.get(target)) {
|
||||
let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
|
||||
let id = participant.get("id").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
|
||||
let metric_name = format!("{}_{}_{}", target, conf_id, id);
|
||||
return Some(
|
||||
MetricOutputExtended::new_with_slices(
|
||||
&metric_name,
|
||||
&metric_name,
|
||||
"type",
|
||||
"Vinteo native",
|
||||
format!(
|
||||
"{} для пользователя {} ({}) в конференции {} ({})",
|
||||
description,
|
||||
name,
|
||||
id,
|
||||
conf_desc,
|
||||
conf_id
|
||||
).as_ref(),
|
||||
Some(18),
|
||||
Some("module$12".to_string()),
|
||||
value.clone()
|
||||
)
|
||||
)
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
|
||||
// create requester
|
||||
let requester = Requester::new()?;
|
||||
// get conference id's
|
||||
loop {
|
||||
info!("Getting conferences array ...");
|
||||
let conferences = requester.get_conferences().await?;
|
||||
let conferences = Requester::get_conferences_from_value(conferences).await?;
|
||||
|
||||
info!("Getting jitter metrics by user in each conference ...");
|
||||
let confs = requester.get_users_jitter_by_conference(conferences).await?;
|
||||
|
||||
info!("Initializing extraction mechanism ...");
|
||||
let conferences = ConferencesDigital::new(confs);
|
||||
let extracted_conference = conferences.go_across();
|
||||
let mut buffer: Vec<MetricOutputExtended> = Vec::new();
|
||||
pin_mut!(extracted_conference);
|
||||
|
||||
while let Some(((desc, id), item)) = extracted_conference.next().await {
|
||||
let parts_stream = across_participants(&item);
|
||||
pin_mut!(parts_stream);
|
||||
|
||||
let total_participants = MetricOutputExtended::new_with_slices(
|
||||
format!("TotalParticipants_{}", id).as_ref(),
|
||||
format!("TotalParticipants_{}", id).as_ref(),
|
||||
"type",
|
||||
"Vinteo Native",
|
||||
format!("Общее количество участников в конференции {}", &desc).as_ref(),
|
||||
Some(18),
|
||||
Some(String::from("module$12")),
|
||||
extract_total_participants(item.clone()).await
|
||||
);
|
||||
// anon NON_STABLE
|
||||
let total_anon_participants = MetricOutputExtended::new_with_slices(
|
||||
format!("TotalAnonymousParticipants_{}", id).as_ref(),
|
||||
format!("TotalAnonymousParticipants_{}", id).as_ref(),
|
||||
"type",
|
||||
"Vinteo Native",
|
||||
format!("Общее количество анонимных участников в конференции {}", &desc).as_ref(),
|
||||
Some(18),
|
||||
Some(String::from("module$12")),
|
||||
extract_total_anon_participants(item.clone()).await
|
||||
);
|
||||
// description
|
||||
|
||||
buffer.push(total_participants);
|
||||
buffer.push(total_anon_participants);
|
||||
|
||||
while let Some(participant) = parts_stream.next().await {
|
||||
buffer.append(
|
||||
&mut gather_futures(
|
||||
participant,
|
||||
Arc::from(id.as_ref()),
|
||||
Arc::from(desc.as_ref()),
|
||||
vec![
|
||||
("txBitrate", "Скорость отправки данных"),
|
||||
("rxBitrate", "Скорость приёма данных"),
|
||||
("txFPS", "Количество отправляемых кадров в секунду"),
|
||||
("rxFPS", "Количество получаемых кадров в секунду"),
|
||||
("rxLost", "Количество потерянных пакетов на приёме"),
|
||||
("txLost", "Количество потерянных пакетов на отправке"),
|
||||
("jBLen", "Фазовое отклонение на цифровом канале"),
|
||||
("txLostPercent", "Процент потерянных пакетов на отправке"),
|
||||
("rxLostPercent", "Процент потерянных пакетов на приёме"),
|
||||
("txH239Bitrate", "Скорость передачи данных в дополнительном видео-потоке"),
|
||||
("rxH239Bitrate", "Скорость приёма данных в дополнительном видео-потоке"),
|
||||
("txVideoCodec", "Используемый видеокодек на отправке"),
|
||||
("rxVideoCodec", "Используемый видеокодек на приёме"),
|
||||
("txAudioCodec", "Используемый аудиокодек на отправке"),
|
||||
("rxAudioCodec", "Используемый аудиокодек на приёме"),
|
||||
("txResolution", "Разрешение передаваемого видео"),
|
||||
("rxResolution", "Разрешение принимаемого видео"),
|
||||
("txH239Resolution", "Разрешение дополнительного видео-потока"),
|
||||
("rxH239Resolution", "Разрешение дополнительного видео-потока на принимающей стороне"),
|
||||
("duration", "Длительсность сеанса (в минутах)"),
|
||||
],
|
||||
).await
|
||||
.into_iter()
|
||||
.filter(|metric| metric.is_some())
|
||||
.map(|metric| metric.unwrap() )
|
||||
.collect::<Vec<MetricOutputExtended>>()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// let metrics = Requester::get_metrics_from_jitter(jitter).await;
|
||||
|
||||
if !buffer.is_empty() {
|
||||
let metrics = PrometheusMetricsExtended::new_zvks(buffer).await;
|
||||
match Exporter::export_extended_metrics(metrics).await {
|
||||
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
|
||||
Err(er) => error!("Cannot export data: {}", er),
|
||||
}
|
||||
} else {
|
||||
warn!("Metrics array is empty. Ignoring exporting ...");
|
||||
}
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
// get users' jitter info
|
||||
}
|
||||
|
|
@ -2,21 +2,6 @@
|
|||
use serde_json::{json, Value};
|
||||
use integr_structs::api::v3::{Metric, MetricOutput};
|
||||
|
||||
/// A JSON-parser struct
|
||||
///
|
||||
/// Using in metric extracting from Server Response
|
||||
/// with metrics mechanism
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use api-grub::json::JsonParser;
|
||||
/// use use integr_structs::api::v3::Metric;
|
||||
///
|
||||
/// let json = b""flat1" : { "room1" : { "rt_tempo" : "+16" }}".to_vec();
|
||||
///
|
||||
/// assert!(!JsonParser::parse(vec![Metric::template()], json).is_empty());
|
||||
/// ```
|
||||
pub struct JsonParser;
|
||||
|
||||
impl JsonParser {
|
||||
|
|
|
|||
|
|
@ -1,38 +1,25 @@
|
|||
use std::str::FromStr;
|
||||
use chrono::Local;
|
||||
use env_logger::Builder;
|
||||
use log::LevelFilter;
|
||||
use std::io::Write;
|
||||
use anyhow::Result;
|
||||
use tracing::info;
|
||||
use log::info;
|
||||
|
||||
/// # Fn `setup_logger`
|
||||
///
|
||||
/// ## function to init terminal logger
|
||||
///
|
||||
/// ### Dev-Info :
|
||||
///
|
||||
/// *input* : -
|
||||
///
|
||||
/// *output* : `anyhow::Result<()>`
|
||||
///
|
||||
/// *initiator* : fn `main`
|
||||
///
|
||||
/// *managing* : -
|
||||
///
|
||||
/// *depends on* : -
|
||||
///
|
||||
pub async fn setup_logger() -> Result<()> {
|
||||
let log_level = std::env::var("IM_LOG_INFO").unwrap_or_else(|_| String::from("INFO"));
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(
|
||||
tracing::Level::from_str(&log_level)
|
||||
.unwrap_or_else(|_| tracing::Level::INFO))
|
||||
.with_writer(std::io::stdout)
|
||||
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::NEW)
|
||||
// .with_timer(Local::now().format("%d-%m-%Y %H:%M:%S"))
|
||||
.with_line_number(false)
|
||||
.with_target(false)
|
||||
.with_file(false)
|
||||
.compact()
|
||||
.init();
|
||||
Builder::new()
|
||||
.format(move |buf, record| {
|
||||
writeln!(
|
||||
buf,
|
||||
"|{}| {} [{}] - {}",
|
||||
"api-grubber",
|
||||
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||
record.level(),
|
||||
record.args(),
|
||||
)
|
||||
})
|
||||
.filter(None, LevelFilter::Info)
|
||||
.target(env_logger::Target::Stdout)
|
||||
.init();
|
||||
|
||||
info!("Logger configured");
|
||||
Ok(())
|
||||
|
|
@ -42,17 +29,22 @@ pub async fn setup_logger() -> Result<()> {
|
|||
#[cfg(test)]
|
||||
mod logger_unittests {
|
||||
use tokio::test;
|
||||
use super::*;
|
||||
#[test]
|
||||
async fn check_logger_builder() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(tracing::Level::INFO)
|
||||
.with_test_writer()
|
||||
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::NEW)
|
||||
// .with_timer(Local::now().format("%d-%m-%Y %H:%M:%S"))
|
||||
.with_line_number(false)
|
||||
.with_target(false)
|
||||
.with_file(false)
|
||||
.compact()
|
||||
Builder::new()
|
||||
.format(move |buf, record| {
|
||||
writeln!(
|
||||
buf,
|
||||
"|{}| {} [{}] - {}",
|
||||
"api-grubber",
|
||||
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||
record.level(),
|
||||
record.args(),
|
||||
)
|
||||
})
|
||||
.filter(None, LevelFilter::Info)
|
||||
.target(env_logger::Target::Stdout)
|
||||
.init();
|
||||
}
|
||||
}
|
||||
|
|
@ -4,19 +4,17 @@ mod logger;
|
|||
mod json;
|
||||
mod export;
|
||||
mod monitoring;
|
||||
mod jitter;
|
||||
|
||||
use anyhow::Result;
|
||||
// use integr_structs::api::ApiConfigV2;
|
||||
use integr_structs::api::v3::Config;
|
||||
use logger::setup_logger;
|
||||
// use log::{info, warn};
|
||||
use config::{pull_local_config, init_config_grub_mechanism};
|
||||
use net::init_api_grub_mechanism;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{error, info, warn};
|
||||
// ENODE_MONITORING_IP
|
||||
use log::{error, info, warn};
|
||||
use monitoring::get_metrics_from_monitoring;
|
||||
// VINTEO_API_KEY
|
||||
use jitter::init_grubbing_jitter;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() -> Result<()>{
|
||||
|
|
@ -25,6 +23,10 @@ async fn main() -> Result<()>{
|
|||
let config = get_config().await;
|
||||
// config update channel
|
||||
let (tx, mut rx) = mpsc::channel::<Config>(1);
|
||||
// futures
|
||||
// todo : rewrite with spawn
|
||||
// let config_fut = init_config_grub_mechanism(&tx);
|
||||
// let grub_fut = init_api_grub_mechanism(config, &mut rx);
|
||||
|
||||
let event_config = tokio::spawn(async move {
|
||||
match init_config_grub_mechanism(&tx).await {
|
||||
|
|
@ -36,30 +38,24 @@ async fn main() -> Result<()>{
|
|||
},
|
||||
}
|
||||
});
|
||||
let request_delay = std::env::var("IM_REQUEST_DELAY")
|
||||
.unwrap_or_else(|_| String::from("5"))
|
||||
.parse::<u32>()
|
||||
.unwrap_or_else(|_| {
|
||||
warn!("No delay was set, setting up as 5 secs ..."); 5
|
||||
});
|
||||
let event_grub = tokio::spawn(async move {
|
||||
// GRAB USING eNODE.MONITORING API GATEWAY
|
||||
if std::env::var("ENODE_MONITORING_IP").is_ok() {
|
||||
match get_metrics_from_monitoring(0, request_delay as usize).await {
|
||||
Ok(_) => info!("Grabing (eNODE.Monitoring) task de-initialized"),
|
||||
Err(er) => error!("Grabing task returned an error : {}", er),
|
||||
match get_metrics_from_monitoring(0, 5).await {
|
||||
Ok(_) => {
|
||||
info!("Grabing (eNODE.Monitoring) task deinitialized");
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Grabing task returned an error : {}", er);
|
||||
},
|
||||
}
|
||||
// JITTER NATIVE GRAB TASK
|
||||
} else if std::env::var("VINTEO_API_KEY").is_ok() {
|
||||
match init_grubbing_jitter().await {
|
||||
Ok(_) => info!("Grabing (Vinteo - Jitter native) task de-initialized"),
|
||||
Err(er) => error!("Jitter grabing mechanism crushed : {}", er),
|
||||
}
|
||||
// NATIVE GRAB TASK USING `config_api.json`
|
||||
} else {
|
||||
match init_api_grub_mechanism(config, &mut rx).await {
|
||||
Ok(_) => info!("Grabing task de-initialized"),
|
||||
Err(er) => error!("Grabing task returned an error : {}", er),
|
||||
Ok(_) => {
|
||||
info!("Grabing task deinitialized");
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Grabing task returned an error : {}", er);
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
@ -67,6 +63,7 @@ async fn main() -> Result<()>{
|
|||
for event in events_handler {
|
||||
let _ = event.await;
|
||||
}
|
||||
// let _ = tokio::join!(config_fut, grub_fut);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,387 +4,133 @@ use serde_json::{Map, Value};
|
|||
use reqwest::Client;
|
||||
use tokio::sync::Semaphore;
|
||||
use std::sync::Arc;
|
||||
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, get_chunk_size};
|
||||
// use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl};
|
||||
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size};
|
||||
// use crate::structs::cmdb::Query;
|
||||
use integr_structs::api::enode_monitoring::cmdb::Query;
|
||||
use tokio::task::JoinHandle;
|
||||
// use crate::structs::get_chunk_size;
|
||||
use std::pin::Pin;
|
||||
use std::future::Future;
|
||||
use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
|
||||
use tracing::{error, info, warn};
|
||||
use std::collections::HashMap;
|
||||
use integr_structs::api::v3::{MetricOutput, PrometheusMetrics};
|
||||
use log::{error, info, warn};
|
||||
// use chrono::{Local, DateTime};
|
||||
|
||||
trait AsDeviceRequest {
|
||||
fn as_devices(self) -> Vec<String>;
|
||||
}
|
||||
trait IntoEnodeRequset {
|
||||
fn into_enode_request(self) -> String;
|
||||
}
|
||||
impl AsDeviceRequest for Vec<String> {
|
||||
fn as_devices(mut self) -> Vec<String> {
|
||||
self.iter_mut()
|
||||
.for_each(|dev| *dev = format!("/measures/device${}", dev));
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MetricInstance {
|
||||
dola_id : String,
|
||||
name : String,
|
||||
desc : String,
|
||||
device : String,
|
||||
source : String,
|
||||
}
|
||||
impl IntoEnodeRequset for &[MetricInstance] {
|
||||
fn into_enode_request(self) -> String {
|
||||
let mut vec: Vec<String> = Vec::new();
|
||||
vec.push("%5B".to_owned());
|
||||
self.iter()
|
||||
.enumerate()
|
||||
.for_each(|(id, val)| {
|
||||
if id > 0 {
|
||||
vec.push(",".to_owned());
|
||||
}
|
||||
vec.push(format!("%22{}%22", val.dola_id));
|
||||
});
|
||||
vec.push("%5D".to_owned());
|
||||
vec.concat()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MetricMeta {
|
||||
name : String,
|
||||
desc : String,
|
||||
device : String,
|
||||
source : String,
|
||||
}
|
||||
|
||||
impl Default for MetricMeta {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name : String::new(),
|
||||
desc : String::new(),
|
||||
device : String::new(),
|
||||
source : String::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(private_interfaces)]
|
||||
pub trait LazyUnzipInstance {
|
||||
fn lazy_unzip(&self) -> HashMap<String, MetricMeta>;
|
||||
}
|
||||
|
||||
impl LazyUnzipInstance for &[MetricInstance] {
|
||||
fn lazy_unzip(&self) -> HashMap<String, MetricMeta> {
|
||||
self.iter().map(
|
||||
|obj|
|
||||
(
|
||||
obj.dola_id.to_string(),
|
||||
MetricMeta::new(
|
||||
&obj.name,
|
||||
&obj.desc,
|
||||
&obj.device,
|
||||
&obj.source
|
||||
)
|
||||
)
|
||||
).collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricInstance {
|
||||
fn new(id : &str, name : &str, desc : &str, device : &str, source : &str) -> Self {
|
||||
Self {
|
||||
dola_id : id.to_owned(),
|
||||
name : name.to_owned(),
|
||||
desc : desc.to_owned(),
|
||||
device : device.to_owned(),
|
||||
source : source.to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricMeta {
|
||||
fn new(name : &str, desc : &str, device : &str, source : &str) -> Self {
|
||||
Self {
|
||||
name : name.to_owned(),
|
||||
desc : desc.to_owned(),
|
||||
device : device.to_owned(),
|
||||
source : source.to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// # Fn `get_metrics_from_monitoring`
|
||||
///
|
||||
/// A function to init pulling and exporting metrics mechanism
|
||||
/// from CM to Exporter. It spawns async tasks to get measures
|
||||
/// and their values and then extract needed info to export
|
||||
///
|
||||
/// ### Dev-Info :
|
||||
///
|
||||
/// *input* : duration and delay as `usize` (in secs)
|
||||
///
|
||||
/// *output* : `anyhow::Result<()>`
|
||||
///
|
||||
/// *initiator* : fn `main`
|
||||
///
|
||||
/// *managing* : runtime of N async tasks (N - count of chunks)
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use api-grub::monitoring::get_metrics_from_monitoring;
|
||||
///
|
||||
/// // exec func without time restriction but with delay in 5 secs
|
||||
/// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(()));
|
||||
/// ```
|
||||
///
|
||||
#[tracing::instrument(name = "cm_fn_initiator", skip_all)]
|
||||
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
|
||||
|
||||
let timer = tokio::time::Instant::now();
|
||||
let mut a = MonitoringImporter::new().await;
|
||||
'outer: loop {
|
||||
// let mut a = MonitoringImporter::new().await;
|
||||
let mut a = MonitoringImporter::new().await;
|
||||
a.start_session().await?;
|
||||
tracing::debug!("CM creds struct - {:#?}", a);
|
||||
|
||||
let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
|
||||
tracing::debug!("Measures Vec - {:#?}", vec);
|
||||
|
||||
'inner: loop {
|
||||
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
|
||||
break 'outer;
|
||||
}
|
||||
if vec.len() == 0 || a.get_measure_info(vec.clone()).await.is_err() {
|
||||
let vec = a.get_metrics_list().await.unwrap_or_else(|_| vec![]);
|
||||
if vec.is_empty() {
|
||||
warn!("Session dropped, creating new ...");
|
||||
break 'inner;
|
||||
}
|
||||
let _ = a.get_measure_info(Arc::new(vec)).await;
|
||||
// a.close_session().await?;
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// An entity which handle CM creds
|
||||
///
|
||||
/// Used to capture measures and there values, to preprocess all measures to
|
||||
/// relevant Exporter's structure
|
||||
///
|
||||
/// # Example:
|
||||
///
|
||||
/// ```
|
||||
/// use api-grub::monitoring::MonitoringImporter;
|
||||
///
|
||||
/// let mut a = MonitoringImporter::new().await;
|
||||
/// a.start_session().await?;
|
||||
/// let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
|
||||
///
|
||||
/// assert_eq!(a.get_measure_info(vec.clone()).await, Ok(()));
|
||||
/// ```
|
||||
///
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MonitoringImporter {
|
||||
ip : String,
|
||||
login : String,
|
||||
password : String,
|
||||
access_token : String,
|
||||
api_token : String,
|
||||
ts : String,
|
||||
timeout : usize,
|
||||
}
|
||||
|
||||
impl MonitoringImporter {
|
||||
/// The most simple constructor for `MonitoringImporter`
|
||||
///
|
||||
/// Returns `Self` object that is constructing according to
|
||||
/// env vars:
|
||||
/// - `ENODE_MONITORING_IP`
|
||||
/// - `ENODE_MONITORING_LOGIN`
|
||||
/// - `ENODE_MONITORING_PASSWORD`
|
||||
///
|
||||
/// If env vars will not be set, it returns `Self` with
|
||||
/// empty fields
|
||||
///
|
||||
pub async fn new() -> Self {
|
||||
MonitoringImporter {
|
||||
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
|
||||
login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()),
|
||||
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
|
||||
access_token : String::new(),
|
||||
api_token : env::var("ENODE_API_TOKEN").unwrap_or_else(|_| String::new()),
|
||||
ts : String::new(),
|
||||
timeout : std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()).parse().unwrap_or_else(|_| 10)
|
||||
}
|
||||
}
|
||||
/// Function that checks is current `MonitoringImporter` valid
|
||||
/// and can be used to pull and push info to and from CM
|
||||
///
|
||||
async fn is_valid(&self) -> bool {
|
||||
!self.ip.is_empty() && ((!self.login.is_empty() && !self.password.is_empty() ) || !self.api_token.is_empty())
|
||||
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
|
||||
}
|
||||
/// Function that checks is current `MonitoringImporter` valid
|
||||
/// and can be used to pull and push info to and from CM
|
||||
///
|
||||
async fn is_minimal(&self) -> bool {
|
||||
(self.login.is_empty() || self.password.is_empty()) && !self.api_token.is_empty()
|
||||
}
|
||||
/// A setter of `timestamp`
|
||||
///
|
||||
/// This function is needed to set a `timestamp` after
|
||||
/// CM session creation.
|
||||
///
|
||||
/// This `timestamp` is a date of creation a session
|
||||
/// on the CM Server
|
||||
async fn set_ts(&mut self, ts: &str) {
|
||||
self.ts = ts.to_owned();
|
||||
}
|
||||
/// A function for creation CM session
|
||||
///
|
||||
/// Returns OK(()) if session was created and there were
|
||||
/// no errors (neither internal no external)
|
||||
///
|
||||
/// *Also* it saves ts and access-key in it's runtime environment,
|
||||
/// there's no way to get access-key of session
|
||||
#[tracing::instrument(name = "cm_fn_session_start", skip_all)]
|
||||
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
||||
if !self.is_valid().await {
|
||||
if self.is_minimal().await {
|
||||
return Err(Error::msg(format!("Given API-Token is no more actual now ({})", &self.access_token)));
|
||||
}
|
||||
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
||||
}
|
||||
if !self.api_token.is_empty() {
|
||||
std::mem::swap(&mut self.access_token, &mut self.api_token);
|
||||
info!("API-Token that was in the ENODE configuration was set as access-token");
|
||||
} else {
|
||||
let client = Client::new();
|
||||
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
||||
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
||||
let mut delay = 1;
|
||||
|
||||
loop {
|
||||
let client = client
|
||||
.post(&url)
|
||||
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&fortoken);
|
||||
if let Ok(resp) = client.send().await {
|
||||
match resp.json::<AuthResponse>().await {
|
||||
Ok(auth) => {
|
||||
self.set_ts(&fortoken.ts).await;
|
||||
self.access_token = auth.access_token.to_owned();
|
||||
tracing::trace!("Access key was changed");
|
||||
break;
|
||||
},
|
||||
Err(er) => error!("Error with extracting access-key from CM response due to {}", er),
|
||||
}
|
||||
}
|
||||
error!("Error while trying to create a new session, waiting {} secs and retrying ...", delay);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await;
|
||||
delay = delay * 2;
|
||||
}
|
||||
info!("Started a new CM session");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A function for pulling measures list
|
||||
///
|
||||
/// Used with actual credentials for current CM session
|
||||
/// and returning measures in format of `Ok(Vec<(String, String)>)`
|
||||
/// , where `(String, String)` is a tuple of measure `id` and `description`
|
||||
/// (`name`)
|
||||
#[tracing::instrument(name = "CM get metrics list mechanism", skip_all)]
|
||||
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<MetricInstance>> {
|
||||
tracing::trace!("Trying ti get measures list from CM ...");
|
||||
let client = Client::new();
|
||||
let mut vec: Vec<MetricInstance> = Vec::new();
|
||||
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
||||
let id_list = {
|
||||
match std::env::var("ENODE_TARGET_DEVICES") {
|
||||
Err(_) => vec![String::from("18"), String::from("19")],
|
||||
Ok(var) => var.split(',').into_iter().map(|st| st.trim().to_string()).collect::<Vec<String>>(),
|
||||
}
|
||||
};
|
||||
let list_of_devices = id_list.clone().as_devices();
|
||||
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
||||
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
||||
// dbg!(&fortoken);
|
||||
let client = client
|
||||
.post(url)
|
||||
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
|
||||
.header("Content-Type", "application/json")
|
||||
.bearer_auth(&self.access_token)
|
||||
.json(&Query::device_oriented(list_of_devices));
|
||||
.json(&fortoken);
|
||||
let resp = client.send().await?;
|
||||
let auth = resp.json::<AuthResponse>().await?;
|
||||
// dbg!(&auth);
|
||||
self.set_ts(&fortoken.ts).await;
|
||||
|
||||
self.access_token = auth.access_token.to_owned();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
pub async fn close_session(&mut self) -> anyhow::Result<()> {
|
||||
let client = Client::new();
|
||||
let url = format!("http://{}/e-data-front/auth/logout", self.ip);
|
||||
let client = client
|
||||
.post(url)
|
||||
.header("Content-Type", "application/json")
|
||||
.header("access-token", &self.access_token);
|
||||
|
||||
let _ = client.send().await?;
|
||||
|
||||
self.access_token.clear();
|
||||
Ok(())
|
||||
}
|
||||
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<String>> {
|
||||
let client = Client::new();
|
||||
let mut vec: Vec<String> = Vec::new();
|
||||
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
||||
let client = client
|
||||
.post(url)
|
||||
.header("Content-Type", "application/json")
|
||||
.header("access-token", &self.access_token)
|
||||
.json(&Query::default());
|
||||
let resp = client.send().await?.text().await?;
|
||||
// dbg!(&resp.text().await);
|
||||
let resp: Value = serde_json::from_str(&resp)?;
|
||||
|
||||
if let Some(arr) = resp.as_array() {
|
||||
for device in arr {
|
||||
let device_id = {
|
||||
match device.get("name") {
|
||||
Some(name) => {
|
||||
match serde_json::to_string(name) {
|
||||
Ok(name) => {
|
||||
name.split('$').last().unwrap_or_else(|| "undefined-device").to_owned()
|
||||
},
|
||||
Err(_) => "undefined-device".to_string(),
|
||||
}
|
||||
},
|
||||
None => "undefined-device".to_string(),
|
||||
}
|
||||
};
|
||||
let device_id = device_id.trim_end_matches('"');
|
||||
if let Some(links) = device.get("links") {
|
||||
if let Some(measures) = links.as_array() {
|
||||
for measure in measures.iter() {
|
||||
let dola_id = measure.get("id");
|
||||
let id = measure.get("measure_id");
|
||||
let source = measure.get("source_id");
|
||||
let desc = measure.get("name");
|
||||
if id.is_some() && source.is_some() && dola_id.is_some() {
|
||||
let dola_id = format!("measure${}", dola_id.unwrap().as_i64().unwrap_or_else(|| 0));
|
||||
let id = id.unwrap().as_str().unwrap_or_else(|| "no-name");
|
||||
let source = source.unwrap().as_str().unwrap_or_else(|| "no-source");
|
||||
let desc = desc.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "no description");
|
||||
if source.is_empty() {
|
||||
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`measure_id` or `source_id`)"));
|
||||
}
|
||||
vec.push(MetricInstance::new(&dola_id, id, desc, device_id.as_ref(), source));
|
||||
}
|
||||
}
|
||||
for measure in arr {
|
||||
let id = measure.get("id");
|
||||
let cls = measure.get("cls");
|
||||
if id.is_some() && cls.is_some() {
|
||||
// todo: later wait for Vaitaliy call of classification
|
||||
let id = id.unwrap().as_i64().unwrap_or_default();
|
||||
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
||||
if cls.is_empty() {
|
||||
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
|
||||
}
|
||||
// let measure_name = format!("{}${}", cls, id);
|
||||
vec.push(format!("{}${}", cls, id));
|
||||
}
|
||||
}
|
||||
// dbg!(vec);
|
||||
} else {
|
||||
return Err(Error::msg("Invalid JSON in response"));
|
||||
}
|
||||
info!("List of measures was pulled, total - {}", &vec.len());
|
||||
Ok(vec)
|
||||
}
|
||||
|
||||
/// A function to get realtime data
|
||||
///
|
||||
/// It pulles info about 1 measure or a slice of measures and
|
||||
/// exports all data to Prometehus exporter
|
||||
///
|
||||
/// # How it works
|
||||
/// 1) creates a restriction for max count of async
|
||||
/// tasks (`tokio::sync::Semaphore`)
|
||||
///
|
||||
/// 2) divides vec of measures in case of creating chunks with
|
||||
/// the most optimal sizes to optimize self and server load
|
||||
///
|
||||
/// 3) spawns async tasks-grabbers to get measures info which
|
||||
/// exprots all data by itselfs
|
||||
///
|
||||
#[tracing::instrument(name = "CM get measures info mechanism", skip_all)]
|
||||
pub async fn get_measure_info(&self, measures: Arc<Vec<MetricInstance>>) -> anyhow::Result<()> {
|
||||
tracing::trace!("Trying to get info about each measure ...");
|
||||
pub async fn get_measure_info(&self, measures: Arc<Vec<String>>) -> anyhow::Result<()> {
|
||||
let mut sys = sysinfo::System::new();
|
||||
sys.refresh_cpu_all();
|
||||
// adaptive permition on task spawm to prevent system overload
|
||||
|
|
@ -393,102 +139,69 @@ impl MonitoringImporter {
|
|||
let client = Arc::new(Client::new());
|
||||
let measures = measures.clone();
|
||||
let arc = Arc::new(self.clone());
|
||||
let chunk_size = get_chunk_size(measures.len());
|
||||
info!("List of measures was divided by chunks with len {}, preparing for {} requests ...", chunk_size, measures.len() / chunk_size);
|
||||
// dbg!(&measures.display());
|
||||
|
||||
for measure in measures.chunks(chunk_size) {
|
||||
// dbg!(&measures.len());
|
||||
for measure in measures.chunks(get_chunk_size(measures.len())) {
|
||||
let permit = sem.clone();
|
||||
let arc = arc.clone();
|
||||
let client = client.clone();
|
||||
let hm = measure.lazy_unzip();
|
||||
let measure = Arc::new(measure.into_enode_request());
|
||||
let measure = Arc::new(measure.display());
|
||||
|
||||
let _permit = permit.acquire().await.unwrap();
|
||||
|
||||
let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
|
||||
Self::process_endpoint(
|
||||
measure.clone(),
|
||||
client.clone(),
|
||||
arc.clone(),
|
||||
&hm,
|
||||
).await
|
||||
let jh: JoinHandle<anyhow::Result<PrometheusMetrics>> = tokio::spawn(async move {
|
||||
Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await
|
||||
|
||||
});
|
||||
jh_vec.push(jh);
|
||||
}
|
||||
|
||||
// let mut vals = Vec::new();
|
||||
for event in jh_vec {
|
||||
match event.await {
|
||||
Ok(val) => {
|
||||
match crate::export::Exporter::export_extended_metrics(val?).await {
|
||||
Ok(bytes) => {info!("Successfully transmitted {} bytes", bytes)},
|
||||
Err(er) => error!("Cannot export data due to : `{}`", er),
|
||||
if let Ok(val) = val {
|
||||
match crate::export::Exporter::export_metrics(val).await {
|
||||
Ok(bytes) => info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes),
|
||||
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
|
||||
}
|
||||
// vals.push(val);
|
||||
}
|
||||
},
|
||||
Err(er) => {
|
||||
println!("Fatal error on async task: {}", er);
|
||||
return Err(anyhow::Error::msg(format!("Fatal error on async task: {}", er)))
|
||||
},
|
||||
Err(er) => println!("Fatal error on async task: {}", er),
|
||||
}
|
||||
}
|
||||
// dbg!(&vals);
|
||||
// dbg!(&vals.len());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// An async task-grabber
|
||||
///
|
||||
/// Used to create request to the CM server and
|
||||
/// get all measure(s) data
|
||||
///
|
||||
/// # Also
|
||||
/// An argument `measure: Arc<String>` can be a single measure like `measure$1` or
|
||||
/// a slice of measures in special format `%5B%22measure$1%22,%20%22measure$2%22%5D`.
|
||||
/// This is a neccesary measure to handle two types of requests and URL restrictions
|
||||
///
|
||||
async fn process_endpoint(
|
||||
measure: Arc<String>,
|
||||
client: Arc<Client>,
|
||||
arc: Arc<Self>,
|
||||
hm: &HashMap<String, MetricMeta>,
|
||||
) -> anyhow::Result<PrometheusMetricsExtended> {
|
||||
tracing::trace!("Processing CM endpoint with one or more measure names");
|
||||
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>) -> anyhow::Result<PrometheusMetrics> {
|
||||
let resp = client
|
||||
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
||||
.timeout(tokio::time::Duration::from_secs(arc.timeout as u64))
|
||||
.header("Content-Type", "application/json")
|
||||
.bearer_auth(&arc.access_token)
|
||||
.header("access-token", &arc.access_token)
|
||||
.send().await?
|
||||
.text().await?;
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
let resp: Value = serde_json::from_str(&resp)?;
|
||||
// let a = Self::extract_metric_data(resp);
|
||||
|
||||
Ok(
|
||||
PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await
|
||||
PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await
|
||||
)
|
||||
}
|
||||
|
||||
/// An recursive extractor of data
|
||||
///
|
||||
/// Uses target-json CM-Server response as `Value` and HashMap of
|
||||
/// measures' `id`s and their appropriate `description`s
|
||||
///
|
||||
/// # How it works
|
||||
/// 1) if `Value` is an `Object` -> executes `Self::process_value` on it and
|
||||
/// returns result of the function as `Vec`
|
||||
///
|
||||
/// 2) if `Value` is an `Array` -> self-executes for each pat of the array
|
||||
/// and aggregates all data in the `Vec` by using `.append(&mut Vec<...>)`
|
||||
///
|
||||
/// 3) if `Value` is `_` -> returns error **Invalid JSON format**
|
||||
///
|
||||
fn extract_metric_data(json: Value, hm: &HashMap<String, MetricMeta>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
|
||||
fn extract_metric_data(json: Value) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutput>>> + Send>> {
|
||||
Box::pin(async move {
|
||||
return match json {
|
||||
Value::Object(obj) => {
|
||||
return Ok(vec![Self::process_value(&obj, hm).await?])
|
||||
// let resp: Value = serde_json::from_str(&obj)?;
|
||||
return Ok(vec![Self::process_value(&obj).await?])
|
||||
},
|
||||
Value::Array(arr) => {
|
||||
let mut vec = Vec::new();
|
||||
for obj in arr {
|
||||
if let Ok(mut val) = Self::extract_metric_data(obj, hm).await {
|
||||
if let Ok(mut val) = Self::extract_metric_data(obj).await {
|
||||
// vec.push(val);
|
||||
vec.append(&mut val);
|
||||
}
|
||||
|
|
@ -499,64 +212,43 @@ impl MonitoringImporter {
|
|||
}
|
||||
})
|
||||
}
|
||||
async fn process_value(obj : &Map<String, Value>) -> anyhow::Result<MetricOutput> {
|
||||
let id = obj.get("id");
|
||||
let val = obj.get("value");
|
||||
|
||||
/// A function-extractor for single measure object
|
||||
///
|
||||
/// Searches for certain fields and aggregates it in the `MetricOutputExtended`
|
||||
/// object
|
||||
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, MetricMeta>) -> anyhow::Result<MetricOutputExtended> {
|
||||
tracing::trace!("Processing atomic Object value in CM JSON-response");
|
||||
let id = obj.get("$id");
|
||||
let val = obj.get("value");
|
||||
|
||||
if id.is_none() || val.is_none() {
|
||||
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
|
||||
}
|
||||
|
||||
let id = id.unwrap().as_str().unwrap_or_else(|| "");
|
||||
let default_meta = MetricMeta::default();
|
||||
let meta = hm.get(id).unwrap_or_else(|| &default_meta);
|
||||
let id = id.replace("$", "_");
|
||||
let val = val.unwrap();
|
||||
let device = meta.device.parse::<usize>().unwrap_or_else(|_| 0);
|
||||
|
||||
if id.is_empty() {
|
||||
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
||||
}
|
||||
Ok(MetricOutputExtended::new_with_slices(
|
||||
id.as_ref(),
|
||||
&meta.name,
|
||||
{
|
||||
match val {
|
||||
Value::Number(val) => {
|
||||
if val.is_i64() {
|
||||
"i64"
|
||||
} else if val.is_u64() {
|
||||
"u64"
|
||||
} else {
|
||||
"f64"
|
||||
}
|
||||
},
|
||||
_ => "unknown",
|
||||
if id.is_none() || val.is_none() {
|
||||
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
|
||||
}
|
||||
},
|
||||
"enode.monitoring.api",
|
||||
&meta.desc,
|
||||
Some(device),
|
||||
Some(meta.source.clone()),
|
||||
val.clone(),
|
||||
))
|
||||
}
|
||||
}
|
||||
let id = id.unwrap().as_str().unwrap_or_else(|| "");
|
||||
let val = val.unwrap();
|
||||
|
||||
impl std::fmt::Debug for MonitoringImporter {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("MonitoringImporter")
|
||||
.field("ip", &self.ip)
|
||||
.field("login", &self.login)
|
||||
.field("password", &"****")
|
||||
.field("access_key", &"HIDDEN")
|
||||
.field("ts", &self.ts)
|
||||
.finish()
|
||||
if id.is_empty() {
|
||||
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
||||
}
|
||||
// pub struct MetricOutput {
|
||||
// pub id : String,
|
||||
// #[serde(rename = "type")]
|
||||
// json_type : String,
|
||||
// addr : String,
|
||||
// pub value : Value,
|
||||
// }
|
||||
|
||||
Ok(MetricOutput {
|
||||
id : id.to_owned(),
|
||||
json_type : match val {
|
||||
Value::Number(val) => {
|
||||
if val.is_i64() {
|
||||
"i64".to_owned()
|
||||
} else if val.is_u64() {
|
||||
"u64".to_owned()
|
||||
} else {
|
||||
"f64".to_owned()
|
||||
}
|
||||
},
|
||||
_ => "unknown".to_owned(),
|
||||
},
|
||||
addr : "enode.monitoring.api".to_owned(),
|
||||
value : val.clone()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -1,18 +1,22 @@
|
|||
// module to handle unix-socket connection + pulling info from api
|
||||
|
||||
use anyhow::Result;
|
||||
use tracing::{error, info};
|
||||
// use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
|
||||
use log::{error, info};
|
||||
use rand::random;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::time::{sleep, Duration};
|
||||
use reqwest::Client;
|
||||
use reqwest::{Client, Method};
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
// use tokio::sync::Mutex;
|
||||
use dotenv::dotenv;
|
||||
use crate::json::JsonParser;
|
||||
use crate::export::Exporter;
|
||||
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
|
||||
// use md5::compute;
|
||||
|
||||
// type BufferType = Arc<Mutex<Vec<String>>>;
|
||||
|
||||
// for api info pulling
|
||||
pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> {
|
||||
|
|
@ -38,13 +42,32 @@ pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>)
|
|||
}
|
||||
let shared_pool = shared_pool.clone();
|
||||
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
|
||||
// sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
||||
}
|
||||
}
|
||||
// Ok(())
|
||||
}
|
||||
|
||||
|
||||
struct RestMethod;
|
||||
|
||||
impl RestMethod {
|
||||
pub async fn from_str(method: &str) -> Method {
|
||||
return match method.trim().to_lowercase().as_str() {
|
||||
"post" => Method::POST,
|
||||
"patch" => Method::PATCH,
|
||||
"put" => Method::PUT,
|
||||
"delete" => Method::DELETE,
|
||||
"head" => Method::HEAD,
|
||||
"trace" => Method::TRACE,
|
||||
"options" => Method::OPTIONS,
|
||||
"connect" => Method::CONNECT,
|
||||
"get" | _ => Method::GET
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ApiPoll<'a> {
|
||||
config : &'a mut Config,
|
||||
#[allow(unused)]
|
||||
client : Client,
|
||||
}
|
||||
|
||||
|
|
@ -55,18 +78,26 @@ impl<'a> ApiPoll<'a> {
|
|||
client : Client::new(),
|
||||
}
|
||||
}
|
||||
// can be weak and with bug test needed
|
||||
pub async fn change_config(&mut self, conf: Config) {
|
||||
*self.config = conf;
|
||||
}
|
||||
pub async fn is_default(&self) -> bool {
|
||||
self.config.is_default().await
|
||||
}
|
||||
// pub async fn get_delay(&self) -> u32 {
|
||||
// self.config.timeout
|
||||
// }
|
||||
pub async fn process_metrics(
|
||||
service_id: Arc<String>,
|
||||
metrics: Arc<Metrics>,
|
||||
creds: Credentials,
|
||||
// exporter: Arc<Exporter>
|
||||
) -> Result<()> {
|
||||
// processing metrics
|
||||
// let mut req = Client::new()
|
||||
// // .user_agent("api_grub/integration_module")
|
||||
// .get(&metrics.url);
|
||||
use std::hash::DefaultHasher;
|
||||
|
||||
let rand = random::<char>();
|
||||
|
|
@ -82,23 +113,42 @@ impl<'a> ApiPoll<'a> {
|
|||
let api_key = &creds.endpoint.api_key;
|
||||
|
||||
if !login.is_empty() && !password.is_empty() {
|
||||
// dbg!("kjgbkasgksjd");
|
||||
req = req.basic_auth(login, Some(password));
|
||||
}
|
||||
if !api_key.is_empty() {
|
||||
// req = req.bearer_auth(&api_key);
|
||||
// req = req.header("authorization", "bearer ");
|
||||
|
||||
req = req.header("accept", "application/json");
|
||||
req = req.header("x-api-key", api_key);
|
||||
|
||||
// req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"])
|
||||
}
|
||||
// dbg!(&req);
|
||||
// let (client, res) = req.build_split();
|
||||
// let res = res.unwrap();
|
||||
// res.url_mut().is_special()
|
||||
|
||||
|
||||
|
||||
// dbg!(client);
|
||||
// dbg!(res);
|
||||
// todo!();
|
||||
|
||||
match req.send().await {
|
||||
Ok(resp) => {
|
||||
// dbg!(&resp.text().await);
|
||||
if let Ok(response) = resp.text().await {
|
||||
match serde_json::to_value(&response) {
|
||||
Err(er) => {
|
||||
error!("Bad JSON in response. Error: {}", er);
|
||||
},
|
||||
Ok(_) => {
|
||||
let endpoint_name = &metrics.name;
|
||||
let preproc = JsonParser::parse(&metrics.measure, &response);
|
||||
let preproc = PrometheusMetrics::new(&service_id, preproc);
|
||||
// dbg!(serde_json::to_string_pretty(&preproc));
|
||||
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
|
||||
match Exporter::export_metrics(preproc).await {
|
||||
Ok(bytes) => {
|
||||
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);
|
||||
|
|
@ -120,15 +170,18 @@ impl<'a> ApiPoll<'a> {
|
|||
Ok(())
|
||||
}
|
||||
pub async fn process_endpoint(
|
||||
// client : Arc<Client>,
|
||||
config : Arc<ConfigEndpoint>,
|
||||
creds : Credentials,
|
||||
// exporter : Arc<Exporter>
|
||||
) -> Result<()> {
|
||||
// TODO: HAVE TO BE USED
|
||||
let _period = config.get_period().unwrap_or(0);
|
||||
//
|
||||
let period = config.get_period().unwrap_or(0);
|
||||
let timeout = config.get_timeout().unwrap_or(5);
|
||||
let metrics = Arc::new(config.metrics.clone());
|
||||
let service_id = Arc::new(config.id.clone());
|
||||
loop {
|
||||
// let exporter = exporter.clone();
|
||||
let creds = creds.clone();
|
||||
let metrics = metrics.clone();
|
||||
let service_id = service_id.clone();
|
||||
|
|
@ -158,23 +211,28 @@ impl<'a> ApiPoll<'a> {
|
|||
// processing
|
||||
sleep(Duration::from_secs(timeout)).await
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
|
||||
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
|
||||
// let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||
// let client = Arc::new(self.client.clone());
|
||||
let config = Arc::new(self.config.clone());
|
||||
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
|
||||
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||
|
||||
for (idx, _) in config.config.iter().enumerate() {
|
||||
// let for_creds = endpoints[idx].clone();
|
||||
let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
|
||||
let endpoint = endpoints[idx].clone();
|
||||
|
||||
// TODO: USE EXPORTER
|
||||
#[allow(unused)]
|
||||
// let client = client.clone();
|
||||
let exporter = exporter.clone();
|
||||
let join_handler = tokio::spawn(async move {
|
||||
Self::process_endpoint(
|
||||
// client,
|
||||
endpoint,
|
||||
creds,
|
||||
// exporter.clone()
|
||||
).await
|
||||
});
|
||||
join_handles.push(join_handler);
|
||||
|
|
@ -187,7 +245,6 @@ impl<'a> ApiPoll<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: FIX TESTS
|
||||
// #[cfg(test)]
|
||||
// mod net_unittests {
|
||||
// use super::*;
|
||||
|
|
|
|||
|
|
@ -2,16 +2,9 @@
|
|||
name = "integr-structs"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "Structs for API poller in ZVKS project"
|
||||
homepage = "http://git.enode/deployer3000/integration-module/src/branch/master/crates/integr-structs"
|
||||
repository = "http://git.enode/deployer3000/integration-module/src/branch/master/crates/integr-structs"
|
||||
license = "MIT OR Apache-2.0"
|
||||
keywords = ["api", "grub", "zvks", "structs", "contracts"]
|
||||
publish = ["kellnr"]
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.95"
|
||||
chrono = "0.4.40"
|
||||
dotenv = "0.15.0"
|
||||
serde = { version = "1.0.217", features = ["derive"] }
|
||||
serde_json = "1.0.135"
|
||||
|
|
|
|||
|
|
@ -160,16 +160,6 @@ pub mod v3 {
|
|||
pub json_type : String,
|
||||
pub addr : String,
|
||||
}
|
||||
impl Metric {
|
||||
pub fn template() -> Self {
|
||||
Self {
|
||||
id : "room_tempo".to_string(),
|
||||
json_type : "String".to_string(),
|
||||
addr : "flat1.room1.rt_tempo".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Metrics {
|
||||
pub name : String,
|
||||
|
|
@ -253,7 +243,7 @@ pub mod v3 {
|
|||
pub value : Value,
|
||||
}
|
||||
impl MetricOutput {
|
||||
pub fn new_with_slices(id : &str, json_type : &str, addr: &str, value : Value) -> Self {
|
||||
pub fn new_with_slices(id : &str, json_type : &str, addr: &str,value : Value) -> Self {
|
||||
MetricOutput {
|
||||
id : id.to_string(),
|
||||
json_type : json_type.to_string(),
|
||||
|
|
@ -262,82 +252,25 @@ pub mod v3 {
|
|||
}
|
||||
}
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct MetricOutputExtended {
|
||||
pub id : String,
|
||||
pub name : String,
|
||||
#[serde(rename = "type")]
|
||||
pub json_type : String,
|
||||
pub addr : String,
|
||||
pub value : Value,
|
||||
#[serde(rename = "description")]
|
||||
pub desc : String,
|
||||
pub status: usize,
|
||||
pub device: Option<usize>,
|
||||
pub source: Option<String>,
|
||||
}
|
||||
impl MetricOutputExtended {
|
||||
pub fn new_with_slices(id : &str, name: &str, json_type : &str, addr: &str, desc : &str, device: Option<usize>, source: Option<String>, value : Value) -> Self {
|
||||
MetricOutputExtended {
|
||||
id : id.to_string(),
|
||||
name : name.to_string(),
|
||||
json_type : json_type.to_string(),
|
||||
addr : addr.to_string(),
|
||||
value : value,
|
||||
desc : desc.to_string(),
|
||||
device,
|
||||
source,
|
||||
status: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct PrometheusMetrics {
|
||||
pub service_name: String,
|
||||
// pub endpoint_name: String,
|
||||
pub endpoint_name: String,
|
||||
pub metrics: Vec<MetricOutput>,
|
||||
}
|
||||
impl PrometheusMetrics {
|
||||
pub fn new(service: &str, metrics: Vec<MetricOutput>) -> Self {
|
||||
pub fn new(service: &str, endpoint: &str, metrics: Vec<MetricOutput>) -> Self {
|
||||
Self {
|
||||
service_name: service.to_string(),
|
||||
// endpoint_name: endpoint.to_string(),
|
||||
endpoint_name: endpoint.to_string(),
|
||||
metrics: metrics
|
||||
}
|
||||
}
|
||||
pub async fn new_zvks(metrics: Vec<MetricOutput>) -> Self {
|
||||
Self {
|
||||
service_name : "zvks".to_owned(),
|
||||
// endpoint_name : "apiforsnmp".to_owned(),
|
||||
metrics : metrics,
|
||||
}
|
||||
}
|
||||
pub fn get_bytes_len(&self) -> usize {
|
||||
let str_metrics = serde_json::to_vec(self).unwrap_or_else(
|
||||
|_| Vec::new()
|
||||
);
|
||||
str_metrics.len()
|
||||
}
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct PrometheusMetricsExtended {
|
||||
pub service_name: String,
|
||||
pub metrics: Vec<MetricOutputExtended>,
|
||||
}
|
||||
impl PrometheusMetricsExtended {
|
||||
pub fn new_empty_jitter() -> Self {
|
||||
Self {
|
||||
service_name : "zvks".to_owned(),
|
||||
metrics : Vec::new(),
|
||||
}
|
||||
}
|
||||
pub fn add(&mut self, metric: MetricOutputExtended) {
|
||||
self.metrics.push(metric);
|
||||
}
|
||||
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
|
||||
Self {
|
||||
service_name : "zvks".to_owned(),
|
||||
endpoint_name : "apiforsnmp".to_owned(),
|
||||
metrics : metrics,
|
||||
}
|
||||
}
|
||||
|
|
@ -352,8 +285,6 @@ pub mod v3 {
|
|||
|
||||
|
||||
pub mod enode_monitoring {
|
||||
use std::hash::Hash;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
|
|
@ -380,18 +311,18 @@ pub mod enode_monitoring {
|
|||
pub struct Query {
|
||||
id : Vec<String>,
|
||||
data : Data,
|
||||
// #[serde(rename = "postQuery")]
|
||||
// post_query : String,
|
||||
#[serde(rename = "postQuery")]
|
||||
post_query : String,
|
||||
#[serde(rename = "enableActions")]
|
||||
enable_actions : bool,
|
||||
ts : usize
|
||||
}
|
||||
impl Query {
|
||||
pub fn device_oriented(devices: Vec<String>) -> Self {
|
||||
impl Default for Query {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
id : devices,
|
||||
id : vec!["/measures/device$18".to_owned()],
|
||||
data : Data::default(),
|
||||
// post_query : "links".to_owned(),
|
||||
post_query : "links".to_owned(),
|
||||
enable_actions : false,
|
||||
ts : 1740060679399
|
||||
}
|
||||
|
|
@ -419,7 +350,7 @@ pub mod enode_monitoring {
|
|||
fn default() -> Self {
|
||||
Self {
|
||||
flatten : true,
|
||||
filter : Filter::default(),
|
||||
filter : Filter::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -436,6 +367,7 @@ pub mod enode_monitoring {
|
|||
}
|
||||
}
|
||||
|
||||
// "{\"access_token\":\"5BNQsmiGFQRNA651HeQxZekYgYUAWZ4e\",\"role\":\"administrator\",\"startRT\":\"2025-02-25T09:03:27.581Z\",\"login\":\"admin\",\"push_active\":null,\"$id\":\"systemuser$1\"}",
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct AuthResponse {
|
||||
pub access_token : String,
|
||||
|
|
@ -451,14 +383,7 @@ pub mod enode_monitoring {
|
|||
fn display(&self) -> String;
|
||||
}
|
||||
|
||||
pub trait LazyUnzip<K, V>
|
||||
where
|
||||
V : Clone,
|
||||
K : Hash + Eq + Clone {
|
||||
fn lazy_unzip(&self) -> HashMap<K, V>;
|
||||
}
|
||||
|
||||
impl<T> GenericUrl for [(T, T)]
|
||||
impl<T> GenericUrl for [T]
|
||||
where T : Display {
|
||||
fn display(&self) -> String {
|
||||
let mut vec: Vec<String> = Vec::new();
|
||||
|
|
@ -469,29 +394,23 @@ pub mod enode_monitoring {
|
|||
if id > 0 {
|
||||
vec.push(",".to_owned());
|
||||
}
|
||||
vec.push(format!("%22{}%22", val.0));
|
||||
vec.push(format!("%22{}%22", val));
|
||||
});
|
||||
vec.push("%5D".to_owned());
|
||||
vec.concat()
|
||||
}
|
||||
}
|
||||
impl<K, V> LazyUnzip<K, V> for [(K, V)]
|
||||
where
|
||||
V : Clone,
|
||||
K : Hash + Eq + Clone {
|
||||
fn lazy_unzip(&self) -> HashMap<K, V> {
|
||||
let mut hm = HashMap::new();
|
||||
self.into_iter()
|
||||
.for_each(|(key, val)| {hm.insert(key.to_owned(), val.to_owned());});
|
||||
hm
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_chunk_size(total_measures: usize) -> usize {
|
||||
match total_measures {
|
||||
0..=432 => total_measures,
|
||||
433..=1008 => total_measures / 2,
|
||||
_ => total_measures / 4,
|
||||
0..=144 => total_measures,
|
||||
145..=288 => total_measures / 4,
|
||||
289..=432 => total_measures / 5,
|
||||
433..=576 => total_measures / 6,
|
||||
577..=720 => total_measures / 7,
|
||||
721..=864 => total_measures / 8,
|
||||
865..=1008 => total_measures / 9,
|
||||
_ => total_measures / 10,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,14 @@
|
|||
[package]
|
||||
name = "preproc"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.95"
|
||||
chrono = "0.4.39"
|
||||
dotenv = "0.15.0"
|
||||
env_logger = "0.11.6"
|
||||
log = "0.4.25"
|
||||
serde = { version = "1.0.217", features = ["derive"] }
|
||||
serde_json = "1.0.137"
|
||||
tokio = { version = "1.43.0", features = ["full"] }
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
// mod for prpeproc config pulling and updating
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod config_unittests {
|
||||
use tokio::test;
|
||||
|
||||
#[test]
|
||||
async fn create_unix_socket_server() { assert!(true) }
|
||||
|
||||
#[test]
|
||||
async fn verify_on_valid_config() { assert!(true) }
|
||||
|
||||
#[test]
|
||||
async fn verify_on_invalid_config() { assert!(true) }
|
||||
}
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
use chrono::Local;
|
||||
use env_logger::Builder;
|
||||
use log::LevelFilter;
|
||||
use std::io::Write;
|
||||
use anyhow::Result;
|
||||
use log::info;
|
||||
|
||||
pub async fn setup_logger() -> Result<()> {
|
||||
Builder::new()
|
||||
.format(move |buf, record| {
|
||||
writeln!(
|
||||
buf,
|
||||
"|{}| {} [{}] - {}",
|
||||
"config-delivery",
|
||||
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||
record.level(),
|
||||
record.args(),
|
||||
)
|
||||
})
|
||||
.filter(None, LevelFilter::Info)
|
||||
.target(env_logger::Target::Stdout)
|
||||
.init();
|
||||
|
||||
info!("Logger configured");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod logger_unittests {
|
||||
use tokio::test;
|
||||
use super::*;
|
||||
#[test]
|
||||
async fn check_logger_builder() {
|
||||
Builder::new()
|
||||
.format(move |buf, record| {
|
||||
writeln!(
|
||||
buf,
|
||||
"|{}| {} [{}] - {}",
|
||||
"config-delivery",
|
||||
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||
record.level(),
|
||||
record.args(),
|
||||
)
|
||||
})
|
||||
.filter(None, LevelFilter::Info)
|
||||
.target(env_logger::Target::Stdout)
|
||||
.init();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,18 @@
|
|||
mod config;
|
||||
mod transform;
|
||||
mod logger;
|
||||
|
||||
use logger::setup_logger;
|
||||
use dotenv::dotenv;
|
||||
use anyhow::Result;
|
||||
use log::info;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() -> Result<()>{
|
||||
let _ = setup_logger().await?;
|
||||
|
||||
info!("Pulling env vars from .env file if exists ...");
|
||||
dotenv().ok();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
// mod for preproccessing and transfering to the CM metrics data
|
||||
Loading…
Reference in New Issue