Compare commits

..

No commits in common. "master" and "feature/1163" have entirely different histories.

14 changed files with 163 additions and 827 deletions

View File

@ -1,4 +0,0 @@
/target
Cargo.lock
*.sock
.env

View File

@ -1,47 +1,22 @@
# Template .env for API grabber
# PostgreSQL connection [DEPRECATED]
# -------------------------------
DB_HOST = "ip.addr.postgresql.server"
DB_USER = "db_user"
DB_PASSWORD = "db_user_password"
DB_DBNAME = "db_name"1
# Prometheus-Exporter info
# -------------------------------
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
# VINTEO Jitter puller (needed to init Jitter native grab)
# -------------------------------
VINTEO_URL_BASE = "http(s)://ip.ip.ip.ip:port"
VINTEO_ENDPOINT_CONFERENCES = "/api/v1/to/something"
VINTEO_ENDPOINT_PARTICIPANTS = "/api/v1/to/something"
VINTEO_API_KEY = "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"
# Status Model API support
# > if exists, ignore `EXPORTER_URL` var
STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input"
# eNODE.Monitoring configuration
# -------------------------------
# eNODE.Monitoring server IP
ENODE_MONITORING_IP = "ip.ip.ip.ip"
# eNODE.Monitoring credentials
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
# List of target devices
ENODE_TARGET_DEVICES = "18, 19"
# to work with unlimit API-Token
ENODE_API_TOKEN = "sssswwwwaaaaffff"
# OPTIONAL SETTINGS
# -------------------------------
# IM configuration for max level of logging info
# for example DEBUG, INFO, WARN, ERROR, TRACE
IM_LOG_INFO = "INFO"
# IM configuration for setting up API connetion
# timeout (in secs). Default value - 10
IM_CONNECTION_TIMEOUT = "10"
# IM configuration for delay of requests
# delay (in secs). Default value - 5
IM_REQUEST_DELAY = "20"
IM_CONNECTION_TIMEOUT = "10"

View File

@ -5,11 +5,7 @@ RUN apt update && apt install -y musl-tools
RUN rustup target add x86_64-unknown-linux-musl
COPY . .
ENV CARGO_HTTP_DEBUG=true
ENV CARGO_HTTP_TIMEOUT=100
RUN cargo test --verbose
RUN cargo test
RUN cargo build --release --target=x86_64-unknown-linux-musl
FROM alpine:latest

37
Jenkinsfile vendored
View File

@ -34,22 +34,7 @@ pipeline {
}
steps {
script {
def hasTags = sh(script: "git tag -l | wc -l", returnStdout: true).trim().toInteger() > 0
def lastVersion = "0.0.0"
if (hasTags) {
lastVersion = sh(script: "git describe --tags --abbrev=0", returnStdout: true).trim()
}
echo "Last version: ${lastVersion}"
def (major, minor, patch) = lastVersion.tokenize('.')
def newVersion = "${major}.${minor}.${patch.toInteger() + 1}"
echo "New version: ${newVersion}"
env.IMAGE_TAG = newVersion
env.NEW_VERSION = newVersion
env.IMAGE_TAG = sh(script: "git describe --tags --abbrev=0", returnStdout: true).trim()
}
}
}
@ -94,30 +79,18 @@ pipeline {
echo "Attempting to merge PR ${env.CHANGE_ID} into master..."
withCredentials([usernamePassword(credentialsId: 'gitea_creds', usernameVariable: 'GITEA_USER', passwordVariable: 'GITEA_PASS')]) {
def prId = env.CHANGE_ID
sh """
curl -X POST \
-u "${GITEA_USER}:${GITEA_PASS}" \
-H "Content-Type: application/json" \
-d '{"do":"merge"}' \
http://git.entcor/api/v1/repos/deployer3000/${env.IMAGE_NAME}/pulls/${prId}/merge
http://git.entcor/api/v1/repos/deployer3000/integration-module/pulls/${prId}/merge
"""
def commitHash = sh(script: "git rev-parse HEAD~1", returnStdout: true).trim() // необходим для корректного отображения статусов
echo "PR ${prId} merged successfully into master!"
sleep(time: 15, unit: 'SECONDS')
sh "git checkout master && git pull origin master"
sh """
curl -v -X POST -u "${GITEA_USER}:${GITEA_PASS}" \
-H "Content-Type: application/json" \
-d '{"tag_name": "${env.NEW_VERSION}", "name": "Release ${env.NEW_VERSION}", "target_commitish": "master"}' \
"${env.GITEA_REPOSITORY_URL}deployer3000/${env.IMAGE_NAME}/releases"
"""
echo "New release succeeded!"
def context = "test-org/${env.IMAGE_NAME}/pipeline/pr-${env.CHANGE_TARGET}"
notify(context, GITEA_USER, GITEA_PASS, env.GITEA_REPOSITORY_URL, env.IMAGE_NAME, commitHash, "success")
def context = "test-org/integration-module/pipeline/pr-${env.CHANGE_TARGET}"
def commitHash = sh(script: "git rev-parse HEAD~1", returnStdout: true).trim()
notify(context, GITEA_USER, GITEA_PASS, env.GITEA_REPOSITORY_URL, "integration-module", commitHash, "success")
}
}
}

164
README.md
View File

@ -1,173 +1,39 @@
# Интеграционный модуль для проекта "Буревестник ВКС"
## Описание
`Интеграционный модуль (ИМ)` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
`integr_mod` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
- получение данных через API ВКС
- поддержку хранения, валидации и актуализации собственных конфигураций
- предобработку полученных данных и сохранение в БД
- интеграции с `еНОД.Мониторинг`
## Специфика работы
На даннный момент предусмотрено два режима работы:
1) **Нативный** - режим работы, производящий прямой опрос сервиса видео-конференц связи `Vinteo` и соотвествующий процесс `ETL`
2) **Статичный** - режим работы *"посредник"*, когда все метрические данные ВКС `Vinteo` получаются через `REST-Full API` средства `еНОД.Мониторинг`
3) **Системный** - аналогичный **статичному** режиму, но метрические данные (заведомо обогащенные нулевым статусом) отправялются не напрямую в модуль `Prometehus exporter`, а в `Статусную модель`
4) **Vinteo** - особый режим работы, предполагающий сбор определенного набора метрик напрямую с ВКС `Vinteo` механизмом многоэтапного `API-запроса`
> **Примечание**
По стандарту `ИМ` работает в **НАТИВНОМ** режиме и ожидает конфигурации в формате `.json`, однако приоритетным считается **СТАТИЧНЫЙ** режим. Подробная информация о настройке в пункте `Руководство`
- предобработку полученных данных и ~~отправку `Системе Мониторинга`~~ сохранение в БД
## Руководство
В данном разделе опсиан алгоритм настройки, сборки и запуска программного модуля `ИМ`
### Преднастройка
1. Выбор режима работы модуля, который скорректирует принцип настройки:
| Режим работы | .env | config-api.json | $STATUS_SYSTEM_URL | $EXPORTER_URL |
|---|---|---|---|---|
| Нативный | ❌ | ✅ | ❌ | ❌ |
| Статичный | ✅ | ❌ | ❌❌❌ | ✅ |
| Системный | ✅ | ❌ | ✅ | ❌ |
, где:
✅ -- следует настроить (предпринять)
❌ -- игнорируется системой, не стоит настраивать
❌❌❌ -- **НЕЛЬЗЯ** настраивать (предпринимать), возможны ошибки в работе
> Режим работы `Vinteo` *не описан* в таблице **намеренно**
### Настройка режима работы "Нативный"
Для настройки данного режима необходимо расположить в **активной** директории конфигурационный `config_api.json` файл:
``` json
{
"config": [
{
"id":"zvks",
"login" : "",
"pass" : "",
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
"period" : "",
"timeout" : "5",
"metrics" : [
{
"name": "conferences",
"url": "https://demo.vcs.vinteo.dev/api/v1/conferences",
"measure": [
{ "id":"number", "type": "text", "addr": "data.conferences[].number" },
{ "id":"total", "type": "integer", "addr": "data.total" },
{ "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" },
{ "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" },
{ "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" }
]
},
{
"name": "abonents",
"url": "https://demo.vcs.vinteo.dev/api/v1/accounts",
"measure": [
{ "id":"total", "type": "integer", "addr": "data.total" }
]
}
]
}
]
}
```
> **Примечание**
Название конфигурационного файла должно быть как в примере - `config_api.json`
### Настройка режима работы "Статичный"
Для настройки данного режима необходимо пополнить данными о сервере в `.env` файле по примеру:
1. Заполнить .env файл или установить переменные окружения в соотвествии с примером в `.env.example` файле
``` toml
...
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port" # <--- экспорт данных (обязательно)
# Template .env for API grabber
# Prometheus-Exporter info
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
# eNODE.Monitoring configuration
ENODE_MONITORING_IP = "ip.ip.ip.ip"
# admin user is required
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"# ---> получение данных
# admin password is required
ENODE_MONITORING_IP = "ip.ip.ip.ip"
# admin user is required
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"
# admin password is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
...
```
### Настройка режима работы "Системный"
Для настройки данного режима необходимо пополнить данными о сервере в `.env` файле по примеру:
``` toml
...
STATUS_SYSTEM_URL = "http(s)://{ip}:{port}/api/input"# <--- экспорт данных
# eNODE.Monitoring configuration
ENODE_MONITORING_IP = "{ip}.{ip}.{ip}.{ip}"
# admin user is required
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"# ---> получение данных
# admin password is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
...
```
### Настройка режима работы "Vinteo"
Для работы в данном режиме необходимо установить переменные окружения в соотвествии со списком ниже
``` toml
...
VINTEO_URL_BASE = "https://demo.vcs.vinteo.dev"
VINTEO_ENDPOINT_CONFERENCES = "/api/v1/conferences"
VINTEO_ENDPOINT_PARTICIPANTS = "/api/v1/participants/"
VINTEO_API_KEY = "00000000000111111111.aaaaaaaaaaaaaaabbbbbbbbbbbbb"
...
```
### Настройка экспорта полученных и обработанных данных
Настройка *точки выхода* для полученных и обработанных метрик определеяется установленными в переменных окружения параметрами, варианта два:
1) **Экспорт в статусную модель** в рамках механизма сквозного прохода данных в проекте `Буревестник ВКС`
``` toml
...
STATUS_SYSTEM_URL = "{BASE_URL}/{ROUTE}"
...
```
2) **Экспорт в экспортер или иной потребитель данных**
``` toml
...
EXPORTER_URL = "{BASE_URL}/{ROUTE}"
...
```
> ### **ОЧЕНЬ ВАЖНОЕ ПРИМЕЧАНИЕ**
> ---
> Одновременное использование `$STATUS_SYSTEM_URL` и `$EXPORTER_URL` **НЕДОПУСТИМО** !! Вариант со ссылкой **на статусную модель** является _по стандарту_ **БОЛЕЕ ПРИОРИТЕТНЫМ**, второй затрется, использовать необходимо только один
2. Произвести сборку проекта командой :
``` bash
cargo build --release
```
3. Запустить
> **Debug** версия
> Debug версия
``` bash
cargo run --bin api-grub
```
или
> **Release** версия
> Release версия
``` bash
cargo run --release --bin api-grub
```
@ -176,6 +42,6 @@ cargo run --release --bin api-grub
| Крейт (подмодуль) | Прогресс |
|---|---|
|`api-grub` | ✅✅✅✅✅✅✅✅✅🛠️ |
|`config-delivery` [migrated] | ❌❌❌❌❌❌❌❌❌❌ |
|`config-delivery [migrated]` | ❌❌❌❌❌❌❌❌❌❌ |
|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ |
|`preproc` [temp-deprecated] | ❌❌❌❌❌❌❌❌❌❌ | (разработка временно остановлена)

View File

@ -1,6 +1,6 @@
[package]
name = "api-grub"
version = "1.0.15"
version = "1.0.2"
edition = "2021"
authors = ["Vladislav Drozdov <maseeeeeeeed@gmail.com>"]
description = "API poller for ZVKS project"
@ -26,7 +26,4 @@ rand = "0.9.0"
sysinfo = "0.33.1"
openssl = { version = "0.10", features = ["vendored"] }
tracing-subscriber = "0.3.19"
tracing = "0.1.41"
lazy_static = "1.5.0"
futures = "0.3.31"
async-stream = "0.3.6"
tracing = "0.1.41"

View File

@ -4,7 +4,7 @@ use reqwest::Client;
use tokio_postgres::NoTls;
use std::env;
use anyhow::Result;
use tracing::{debug, error, info, trace};
use tracing::{debug, error, info};
use std::ops::Drop;
/// An entity which handles DB connections.
@ -113,18 +113,13 @@ impl Exporter {
}
/// Exports metrics in `PrometheusMetricsExtended` format to Exporter defined
/// as env var $EXORPTER_URL
#[tracing::instrument(name = "Prometheus/Status System export")]
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
// let url = env::var("EXPORTER_URL")?;
let url = env::var("STATUS_SYSTEM_URL").or_else(|err| {
trace!("cannot fetch $STATUS_SYSTEM_URL var due to {}. working only with Prometheus exporter link", err);
env::var("EXPORTER_URL")
})?;
let url = env::var("EXPORTER_URL")?;
debug!("Exporting: {:?}", &metrics);
let req = Client::new()
.post(&url)
.post(url)
.json(&metrics)
.send().await;
req?;

View File

@ -1,319 +0,0 @@
use std::collections::{HashMap, HashSet};
use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended};
use std::sync::Arc;
use reqwest::Client;
use lazy_static::lazy_static;
use tracing::{error, info, trace, warn};
use std::pin::Pin;
use serde_json::{Value, from_str};
use crate::export::Exporter;
use async_stream::stream;
use futures::{future, pin_mut, stream::Stream, StreamExt};
lazy_static! {
static ref VINTEO_BASE: String = std::env::var("VINTEO_URL_BASE").unwrap_or_else(|_| String::from("https://demo.vcs.vinteo.dev"));
static ref CONFERENCES_ENDPOINT: String = std::env::var("VINTEO_ENDPOINT_CONFERENCES").unwrap_or_else(|_| String::from("/api/v1/conferences"));
static ref USERS_ENDPOINT: String = std::env::var("VINTEO_ENDPOINT_PARTICIPANTS").unwrap_or_else(|_| String::from("/api/v1/participants/"));
}
// conferences ids
type Conferences = HashSet<(String, String)>;
type OutputConferences = HashMap<(String, String), Value>;
#[derive(Debug, Clone)]
struct ConferencesDigital(OutputConferences);
impl ConferencesDigital {
pub fn new(item: OutputConferences) -> Self {
Self(item)
}
pub fn go_across(self) -> impl Stream<Item = ((String, String), Arc<Value>)> {
stream! {
for (key, value) in self.0 {
info!("Conference {} ({}) is processing ...", key.0, key.1);
yield (key, Arc::new(value));
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
}
// to handle async http requests
#[derive(Debug)]
struct Requester {
base: String,
api_key: String,
client: Client,
}
impl Requester {
pub fn new() -> anyhow::Result<Self> {
Ok(Self {
base: VINTEO_BASE.clone().to_owned(),
api_key: std::env::var("VINTEO_API_KEY")?,
client: Client::builder().user_agent("api-grub").build()?,
})
}
pub async fn get_conferences(&self) -> anyhow::Result<Value> {
trace!("getting conferences list from API");
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
let req = self.client.get(url)
.timeout(tokio::time::Duration::from_secs(10))
.header("accept", "application/json")
.header("x-api-key", &self.api_key);
let resp = req.send().await;
Ok(from_str(&resp?.text().await?)?)
}
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
trace!("extracting conferences list");
let mut hashset = Conferences::new();
match conferences.get("data") {
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
Some(data) => {
match data.get("conferences") {
None => return Err(anyhow::Error::msg("Invalid JSON format: no `conferences` field")),
Some(confs) => {
if let Some(confs) = confs.as_array() {
confs.iter()
.for_each(|conf_obj| {
if let Some(id) = conf_obj.get("number") {
if let Some(id) = id.as_str() {
let id = {
if let Some(desc) = conf_obj.get("description") {
match desc.as_str() {
Some(desc) => (desc.to_string(), id.to_string()),
None => (String::new(), id.to_string())
}
} else {
(String::new(), id.to_string())
}
};
hashset.insert(id);
}
}
});
} else {
return Err(anyhow::Error::msg("Invalid JSON format: `conferences` is not an array"))
}
},
}
},
}
Ok(hashset)
}
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<OutputConferences> {
trace!("getting users list for each conference from API ...");
let mut output = OutputConferences::new();
for conf in conferences {
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
let req = self.client.get(url)
.timeout(tokio::time::Duration::from_secs(10))
.header("accept", "application/json")
.header("x-api-key", &self.api_key);
match req.send().await {
Err(er) => error!("Cannot GET data about conference - {:?}: {}", conf, er),
Ok(resp) => {
match resp.text().await {
Err(er) => error!("Cannot convert response into text: {}", er),
Ok(resp) => {
let resp: Value = from_str(&resp)?;
output.entry(conf).or_insert(resp);
}
}
},
}
}
Ok(output)
}
}
fn across_participants(conf: &Value) -> impl Stream<Item = Arc<Value>> + '_ {
trace!("fetching participants for conf and going across ...");
let participants_iter = conf
.get("data")
.and_then(|data| data.get("participants"))
.and_then(|participants| participants.as_array())
.into_iter()
.flatten()
.cloned();
stream! {
for participant in participants_iter {
yield Arc::new(participant);
}
}
}
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<(&str, &str)>) -> Vec<Option<MetricOutputExtended>> {
trace!("extracting {} measures for current participant in current conference {} ...", targets.len(), conf_id);
let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new();
for (target, description) in targets {
futures.push(Box::pin(extract_from_participant(
parts.clone(),
target,
conf_id.clone(),
conf_desc.clone(),
description
)));
}
future::join_all(futures).await
}
// GET participants/{conference_id} data.total
async fn extract_total_participants(conferences : Arc<Value>) -> Value {
if let Some(total) = conferences
.get("data")
.and_then(|data| data.get("total")) {
return total.clone()
}
Value::Null
}
async fn extract_total_anon_participants(conferences : Arc<Value>) -> Value {
let sum = conferences
.get("data")
.and_then(|data| data.get("participants"))
.and_then(|parts| parts.as_array())
.iter()
.flat_map(|&val| val.iter())
.filter_map(|part| part.get("isAnonymous"))
.filter_map(|is_anon| is_anon.as_bool())
.filter(|is_anon| *is_anon == true)
.count();
Value::Number(sum.into())
}
// GET participants/{conference_id} data.participants[].isAnonymous
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>, description: &str) -> Option<MetricOutputExtended> {
trace!("extracting `{}` measure for current participant ...", target);
if let Some(value) = participant.get("params")
.and_then(|params| params.get(target)) {
let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
let id = participant.get("id").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
let metric_name = format!("{}_{}_{}", target, conf_id, id);
return Some(
MetricOutputExtended::new_with_slices(
&metric_name,
&metric_name,
"type",
"Vinteo native",
format!(
"{} для пользователя {} ({}) в конференции {} ({})",
description,
name,
id,
conf_desc,
conf_id
).as_ref(),
Some(18),
Some("module$12".to_string()),
value.clone()
)
)
}
None
}
pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
// create requester
let requester = Requester::new()?;
// get conference id's
loop {
info!("Getting conferences array ...");
let conferences = requester.get_conferences().await?;
let conferences = Requester::get_conferences_from_value(conferences).await?;
info!("Getting jitter metrics by user in each conference ...");
let confs = requester.get_users_jitter_by_conference(conferences).await?;
info!("Initializing extraction mechanism ...");
let conferences = ConferencesDigital::new(confs);
let extracted_conference = conferences.go_across();
let mut buffer: Vec<MetricOutputExtended> = Vec::new();
pin_mut!(extracted_conference);
while let Some(((desc, id), item)) = extracted_conference.next().await {
let parts_stream = across_participants(&item);
pin_mut!(parts_stream);
let total_participants = MetricOutputExtended::new_with_slices(
format!("TotalParticipants_{}", id).as_ref(),
format!("TotalParticipants_{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество участников в конференции {}", &desc).as_ref(),
Some(18),
Some(String::from("module$12")),
extract_total_participants(item.clone()).await
);
// anon NON_STABLE
let total_anon_participants = MetricOutputExtended::new_with_slices(
format!("TotalAnonymousParticipants_{}", id).as_ref(),
format!("TotalAnonymousParticipants_{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество анонимных участников в конференции {}", &desc).as_ref(),
Some(18),
Some(String::from("module$12")),
extract_total_anon_participants(item.clone()).await
);
// description
buffer.push(total_participants);
buffer.push(total_anon_participants);
while let Some(participant) = parts_stream.next().await {
buffer.append(
&mut gather_futures(
participant,
Arc::from(id.as_ref()),
Arc::from(desc.as_ref()),
vec![
("txBitrate", "Скорость отправки данных"),
("rxBitrate", "Скорость приёма данных"),
("txFPS", "Количество отправляемых кадров в секунду"),
("rxFPS", "Количество получаемых кадров в секунду"),
("rxLost", "Количество потерянных пакетов на приёме"),
("txLost", "Количество потерянных пакетов на отправке"),
("jBLen", "Фазовое отклонение на цифровом канале"),
("txLostPercent", "Процент потерянных пакетов на отправке"),
("rxLostPercent", "Процент потерянных пакетов на приёме"),
("txH239Bitrate", "Скорость передачи данных в дополнительном видео-потоке"),
("rxH239Bitrate", "Скорость приёма данных в дополнительном видео-потоке"),
("txVideoCodec", "Используемый видеокодек на отправке"),
("rxVideoCodec", "Используемый видеокодек на приёме"),
("txAudioCodec", "Используемый аудиокодек на отправке"),
("rxAudioCodec", "Используемый аудиокодек на приёме"),
("txResolution", "Разрешение передаваемого видео"),
("rxResolution", "Разрешение принимаемого видео"),
("txH239Resolution", "Разрешение дополнительного видео-потока"),
("rxH239Resolution", "Разрешение дополнительного видео-потока на принимающей стороне"),
("duration", "Длительсность сеанса (в минутах)"),
],
).await
.into_iter()
.filter(|metric| metric.is_some())
.map(|metric| metric.unwrap() )
.collect::<Vec<MetricOutputExtended>>()
);
}
}
// let metrics = Requester::get_metrics_from_jitter(jitter).await;
if !buffer.is_empty() {
let metrics = PrometheusMetricsExtended::new_zvks(buffer).await;
match Exporter::export_extended_metrics(metrics).await {
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
Err(er) => error!("Cannot export data: {}", er),
}
} else {
warn!("Metrics array is empty. Ignoring exporting ...");
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
// get users' jitter info
}

View File

@ -1,4 +1,6 @@
use std::str::FromStr;
use chrono::Local;
use anyhow::Result;
use tracing::info;
@ -19,6 +21,20 @@ use tracing::info;
/// *depends on* : -
///
pub async fn setup_logger() -> Result<()> {
// Builder::new()
// .format(move |buf, record| {
// writeln!(
// buf,
// "|{}| {} [{}] - {}",
// "api-grubber",
// Local::now().format("%d-%m-%Y %H:%M:%S"),
// record.level(),
// record.args(),
// )
// })
// .filter(None, LevelFilter::Info)
// .target(env_logger::Target::Stdout)
// .init();
let log_level = std::env::var("IM_LOG_INFO").unwrap_or_else(|_| String::from("INFO"));
tracing_subscriber::fmt()

View File

@ -4,7 +4,6 @@ mod logger;
mod json;
mod export;
mod monitoring;
mod jitter;
use anyhow::Result;
use integr_structs::api::v3::Config;
@ -13,10 +12,7 @@ use config::{pull_local_config, init_config_grub_mechanism};
use net::init_api_grub_mechanism;
use tokio::sync::mpsc;
use tracing::{error, info, warn};
// ENODE_MONITORING_IP
use monitoring::get_metrics_from_monitoring;
// VINTEO_API_KEY
use jitter::init_grubbing_jitter;
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()>{
@ -36,30 +32,24 @@ async fn main() -> Result<()>{
},
}
});
let request_delay = std::env::var("IM_REQUEST_DELAY")
.unwrap_or_else(|_| String::from("5"))
.parse::<u32>()
.unwrap_or_else(|_| {
warn!("No delay was set, setting up as 5 secs ..."); 5
});
let event_grub = tokio::spawn(async move {
// GRAB USING eNODE.MONITORING API GATEWAY
if std::env::var("ENODE_MONITORING_IP").is_ok() {
match get_metrics_from_monitoring(0, request_delay as usize).await {
Ok(_) => info!("Grabing (eNODE.Monitoring) task de-initialized"),
Err(er) => error!("Grabing task returned an error : {}", er),
match get_metrics_from_monitoring(0, 5).await {
Ok(_) => {
info!("Grabing (eNODE.Monitoring) task deinitialized");
},
Err(er) => {
error!("Grabing task returned an error : {}", er);
},
}
// JITTER NATIVE GRAB TASK
} else if std::env::var("VINTEO_API_KEY").is_ok() {
match init_grubbing_jitter().await {
Ok(_) => info!("Grabing (Vinteo - Jitter native) task de-initialized"),
Err(er) => error!("Jitter grabing mechanism crushed : {}", er),
}
// NATIVE GRAB TASK USING `config_api.json`
} else {
match init_api_grub_mechanism(config, &mut rx).await {
Ok(_) => info!("Grabing task de-initialized"),
Err(er) => error!("Grabing task returned an error : {}", er),
Ok(_) => {
info!("Grabing task deinitialized");
},
Err(er) => {
error!("Grabing task returned an error : {}", er);
},
}
}
});

View File

@ -4,7 +4,7 @@ use serde_json::{Map, Value};
use reqwest::Client;
use tokio::sync::Semaphore;
use std::sync::Arc;
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, get_chunk_size};
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size};
use integr_structs::api::enode_monitoring::cmdb::Query;
use tokio::task::JoinHandle;
use std::pin::Pin;
@ -13,110 +13,7 @@ use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
use tracing::{error, info, warn};
use std::collections::HashMap;
trait AsDeviceRequest {
fn as_devices(self) -> Vec<String>;
}
trait IntoEnodeRequset {
fn into_enode_request(self) -> String;
}
impl AsDeviceRequest for Vec<String> {
fn as_devices(mut self) -> Vec<String> {
self.iter_mut()
.for_each(|dev| *dev = format!("/measures/device${}", dev));
self
}
}
#[derive(Debug)]
struct MetricInstance {
dola_id : String,
name : String,
desc : String,
device : String,
source : String,
}
impl IntoEnodeRequset for &[MetricInstance] {
fn into_enode_request(self) -> String {
let mut vec: Vec<String> = Vec::new();
vec.push("%5B".to_owned());
self.iter()
.enumerate()
.for_each(|(id, val)| {
if id > 0 {
vec.push(",".to_owned());
}
vec.push(format!("%22{}%22", val.dola_id));
});
vec.push("%5D".to_owned());
vec.concat()
}
}
#[derive(Debug)]
struct MetricMeta {
name : String,
desc : String,
device : String,
source : String,
}
impl Default for MetricMeta {
fn default() -> Self {
Self {
name : String::new(),
desc : String::new(),
device : String::new(),
source : String::new(),
}
}
}
#[allow(private_interfaces)]
pub trait LazyUnzipInstance {
fn lazy_unzip(&self) -> HashMap<String, MetricMeta>;
}
impl LazyUnzipInstance for &[MetricInstance] {
fn lazy_unzip(&self) -> HashMap<String, MetricMeta> {
self.iter().map(
|obj|
(
obj.dola_id.to_string(),
MetricMeta::new(
&obj.name,
&obj.desc,
&obj.device,
&obj.source
)
)
).collect()
}
}
impl MetricInstance {
fn new(id : &str, name : &str, desc : &str, device : &str, source : &str) -> Self {
Self {
dola_id : id.to_owned(),
name : name.to_owned(),
desc : desc.to_owned(),
device : device.to_owned(),
source : source.to_owned(),
}
}
}
impl MetricMeta {
fn new(name : &str, desc : &str, device : &str, source : &str) -> Self {
Self {
name : name.to_owned(),
desc : desc.to_owned(),
device : device.to_owned(),
source : source.to_owned(),
}
}
}
// const IM_CONNECTION_TIMEOUT: String = std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string());
/// # Fn `get_metrics_from_monitoring`
///
@ -164,6 +61,10 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho
warn!("Session dropped, creating new ...");
break 'inner;
}
// if let Err(_) = a.get_measure_info(vec.clone()).await {
// warn!("Session dropped, creating new ...");
// break 'inner;
// }
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
}
}
@ -194,7 +95,6 @@ pub struct MonitoringImporter {
login : String,
password : String,
access_token : String,
api_token : String,
ts : String,
timeout : usize,
}
@ -217,7 +117,6 @@ impl MonitoringImporter {
login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()),
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
access_token : String::new(),
api_token : env::var("ENODE_API_TOKEN").unwrap_or_else(|_| String::new()),
ts : String::new(),
timeout : std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()).parse().unwrap_or_else(|_| 10)
}
@ -226,13 +125,7 @@ impl MonitoringImporter {
/// and can be used to pull and push info to and from CM
///
async fn is_valid(&self) -> bool {
!self.ip.is_empty() && ((!self.login.is_empty() && !self.password.is_empty() ) || !self.api_token.is_empty())
}
/// Function that checks is current `MonitoringImporter` valid
/// and can be used to pull and push info to and from CM
///
async fn is_minimal(&self) -> bool {
(self.login.is_empty() || self.password.is_empty()) && !self.api_token.is_empty()
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
}
/// A setter of `timestamp`
///
@ -254,43 +147,38 @@ impl MonitoringImporter {
#[tracing::instrument(name = "cm_fn_session_start", skip_all)]
pub async fn start_session(&mut self) -> anyhow::Result<()> {
if !self.is_valid().await {
if self.is_minimal().await {
return Err(Error::msg(format!("Given API-Token is no more actual now ({})", &self.access_token)));
}
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
}
if !self.api_token.is_empty() {
std::mem::swap(&mut self.access_token, &mut self.api_token);
info!("API-Token that was in the ENODE configuration was set as access-token");
} else {
let client = Client::new();
let url = format!("http://{}/e-data-front/auth/login", self.ip);
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
let mut delay = 1;
let client = Client::new();
let url = format!("http://{}/e-data-front/auth/login", self.ip);
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
let mut delay = 1;
loop {
let client = client
.post(&url)
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
.header("Content-Type", "application/json")
.json(&fortoken);
if let Ok(resp) = client.send().await {
match resp.json::<AuthResponse>().await {
Ok(auth) => {
self.set_ts(&fortoken.ts).await;
self.access_token = auth.access_token.to_owned();
tracing::trace!("Access key was changed");
break;
},
Err(er) => error!("Error with extracting access-key from CM response due to {}", er),
}
loop {
let client = client
.post(&url)
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
.header("Content-Type", "application/json")
.json(&fortoken);
// let resp = client.send().await?;
if let Ok(resp) = client.send().await {
// let auth = resp.json::<AuthResponse>().await?;
match resp.json::<AuthResponse>().await {
Ok(auth) => {
self.set_ts(&fortoken.ts).await;
self.access_token = auth.access_token.to_owned();
tracing::trace!("Access key was changed");
break;
},
Err(er) => error!("Error with extracting access-key from CM response due to {}", er),
}
error!("Error while trying to create a new session, waiting {} secs and retrying ...", delay);
tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await;
delay = delay * 2;
}
info!("Started a new CM session");
error!("Error while trying to create a new session, waiting {} secs and retrying ...", delay);
tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await;
delay = delay * 2;
}
info!("Started a new CM session");
Ok(())
}
@ -301,63 +189,32 @@ impl MonitoringImporter {
/// , where `(String, String)` is a tuple of measure `id` and `description`
/// (`name`)
#[tracing::instrument(name = "CM get metrics list mechanism", skip_all)]
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<MetricInstance>> {
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
tracing::trace!("Trying ti get measures list from CM ...");
let client = Client::new();
let mut vec: Vec<MetricInstance> = Vec::new();
let mut vec: Vec<(String, String)> = Vec::new();
let url = format!("http://{}/e-cmdb/api/query", self.ip);
let id_list = {
match std::env::var("ENODE_TARGET_DEVICES") {
Err(_) => vec![String::from("18"), String::from("19")],
Ok(var) => var.split(',').into_iter().map(|st| st.trim().to_string()).collect::<Vec<String>>(),
}
};
let list_of_devices = id_list.clone().as_devices();
let client = client
.post(url)
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
.header("Content-Type", "application/json")
.bearer_auth(&self.access_token)
.json(&Query::device_oriented(list_of_devices));
.header("access-token", &self.access_token)
.json(&Query::default());
let resp = client.send().await?.text().await?;
let resp: Value = serde_json::from_str(&resp)?;
if let Some(arr) = resp.as_array() {
for device in arr {
let device_id = {
match device.get("name") {
Some(name) => {
match serde_json::to_string(name) {
Ok(name) => {
name.split('$').last().unwrap_or_else(|| "undefined-device").to_owned()
},
Err(_) => "undefined-device".to_string(),
}
},
None => "undefined-device".to_string(),
}
};
let device_id = device_id.trim_end_matches('"');
if let Some(links) = device.get("links") {
if let Some(measures) = links.as_array() {
for measure in measures.iter() {
let dola_id = measure.get("id");
let id = measure.get("measure_id");
let source = measure.get("source_id");
let desc = measure.get("name");
if id.is_some() && source.is_some() && dola_id.is_some() {
let dola_id = format!("measure${}", dola_id.unwrap().as_i64().unwrap_or_else(|| 0));
let id = id.unwrap().as_str().unwrap_or_else(|| "no-name");
let source = source.unwrap().as_str().unwrap_or_else(|| "no-source");
let desc = desc.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "no description");
if source.is_empty() {
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`measure_id` or `source_id`)"));
}
vec.push(MetricInstance::new(&dola_id, id, desc, device_id.as_ref(), source));
}
}
for measure in arr {
let id = measure.get("id");
let cls = measure.get("cls");
let name = measure.get("name");
if id.is_some() && cls.is_some() {
let id = id.unwrap().as_i64().unwrap_or_default();
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
if cls.is_empty() {
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
}
vec.push((format!("{}${}", cls, id), name.to_string()));
}
}
} else {
@ -383,8 +240,8 @@ impl MonitoringImporter {
/// exprots all data by itselfs
///
#[tracing::instrument(name = "CM get measures info mechanism", skip_all)]
pub async fn get_measure_info(&self, measures: Arc<Vec<MetricInstance>>) -> anyhow::Result<()> {
tracing::trace!("Trying to get info about each measure ...");
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
tracing::trace!("Trying ti get info about each measure ...");
let mut sys = sysinfo::System::new();
sys.refresh_cpu_all();
// adaptive permition on task spawm to prevent system overload
@ -401,7 +258,7 @@ impl MonitoringImporter {
let arc = arc.clone();
let client = client.clone();
let hm = measure.lazy_unzip();
let measure = Arc::new(measure.into_enode_request());
let measure = Arc::new(measure.display());
let _permit = permit.acquire().await.unwrap();
let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
@ -420,8 +277,8 @@ impl MonitoringImporter {
match event.await {
Ok(val) => {
match crate::export::Exporter::export_extended_metrics(val?).await {
Ok(bytes) => {info!("Successfully transmitted {} bytes", bytes)},
Err(er) => error!("Cannot export data due to : `{}`", er),
Ok(bytes) => {info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes)},
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
}
},
Err(er) => {
@ -447,14 +304,14 @@ impl MonitoringImporter {
measure: Arc<String>,
client: Arc<Client>,
arc: Arc<Self>,
hm: &HashMap<String, MetricMeta>,
hm: &HashMap<String, String>,
) -> anyhow::Result<PrometheusMetricsExtended> {
tracing::trace!("Processing CM endpoint with one or more measure names");
let resp = client
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
.timeout(tokio::time::Duration::from_secs(arc.timeout as u64))
.header("Content-Type", "application/json")
.bearer_auth(&arc.access_token)
.header("access-token", &arc.access_token)
.send().await?
.text().await?;
tokio::task::yield_now().await;
@ -479,7 +336,7 @@ impl MonitoringImporter {
///
/// 3) if `Value` is `_` -> returns error **Invalid JSON format**
///
fn extract_metric_data(json: Value, hm: &HashMap<String, MetricMeta>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
Box::pin(async move {
return match json {
Value::Object(obj) => {
@ -504,48 +361,52 @@ impl MonitoringImporter {
///
/// Searches for certain fields and aggregates it in the `MetricOutputExtended`
/// object
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, MetricMeta>) -> anyhow::Result<MetricOutputExtended> {
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
tracing::trace!("Processing atomic Object value in CM JSON-response");
let id = obj.get("$id");
let val = obj.get("value");
let description = {
let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null);
let zero = String::new();
if dola_ip.is_null() {
zero
} else {
hm.get(
dola_ip.as_str().unwrap_or_else(|| "")
)
.unwrap_or_else(|| &zero)
.to_owned()
}
};
if id.is_none() || val.is_none() {
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
}
let id = id.unwrap().as_str().unwrap_or_else(|| "");
let default_meta = MetricMeta::default();
let meta = hm.get(id).unwrap_or_else(|| &default_meta);
let id = id.replace("$", "_");
let id = id.unwrap().as_str().unwrap_or_else(|| "").replace("$", "_");
let val = val.unwrap();
let device = meta.device.parse::<usize>().unwrap_or_else(|_| 0);
if id.is_empty() {
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
}
Ok(MetricOutputExtended::new_with_slices(
id.as_ref(),
&meta.name,
{
match val {
Ok(MetricOutputExtended {
id : id.to_owned(),
json_type : match val {
Value::Number(val) => {
if val.is_i64() {
"i64"
"i64".to_owned()
} else if val.is_u64() {
"u64"
"u64".to_owned()
} else {
"f64"
"f64".to_owned()
}
},
_ => "unknown",
}
_ => "unknown".to_owned(),
},
"enode.monitoring.api",
&meta.desc,
Some(device),
Some(meta.source.clone()),
val.clone(),
))
addr : "enode.monitoring.api".to_owned(),
desc : description,
value : val.clone()
})
}
}

View File

@ -97,8 +97,9 @@ impl<'a> ApiPoll<'a> {
error!("Bad JSON in response. Error: {}", er);
},
Ok(_) => {
let endpoint_name = &metrics.name;
let preproc = JsonParser::parse(&metrics.measure, &response);
let preproc = PrometheusMetrics::new(&service_id, preproc);
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
match Exporter::export_metrics(preproc).await {
Ok(bytes) => {
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);

View File

@ -12,6 +12,5 @@ publish = ["kellnr"]
[dependencies]
anyhow = "1.0.95"
chrono = "0.4.40"
dotenv = "0.15.0"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.135"

View File

@ -265,29 +265,21 @@ pub mod v3 {
#[derive(Serialize, Deserialize, Debug)]
pub struct MetricOutputExtended {
pub id : String,
pub name : String,
#[serde(rename = "type")]
pub json_type : String,
pub addr : String,
pub value : Value,
#[serde(rename = "description")]
pub desc : String,
pub status: usize,
pub device: Option<usize>,
pub source: Option<String>,
}
impl MetricOutputExtended {
pub fn new_with_slices(id : &str, name: &str, json_type : &str, addr: &str, desc : &str, device: Option<usize>, source: Option<String>, value : Value) -> Self {
pub fn new_with_slices(id : &str, json_type : &str, addr: &str, desc : &str, value : Value) -> Self {
MetricOutputExtended {
id : id.to_string(),
name : name.to_string(),
json_type : json_type.to_string(),
addr : addr.to_string(),
value : value,
desc : desc.to_string(),
device,
source,
status: 0,
}
}
}
@ -295,21 +287,21 @@ pub mod v3 {
#[derive(Serialize, Deserialize, Debug)]
pub struct PrometheusMetrics {
pub service_name: String,
// pub endpoint_name: String,
pub endpoint_name: String,
pub metrics: Vec<MetricOutput>,
}
impl PrometheusMetrics {
pub fn new(service: &str, metrics: Vec<MetricOutput>) -> Self {
pub fn new(service: &str, endpoint: &str, metrics: Vec<MetricOutput>) -> Self {
Self {
service_name: service.to_string(),
// endpoint_name: endpoint.to_string(),
endpoint_name: endpoint.to_string(),
metrics: metrics
}
}
pub async fn new_zvks(metrics: Vec<MetricOutput>) -> Self {
Self {
service_name : "zvks".to_owned(),
// endpoint_name : "apiforsnmp".to_owned(),
endpoint_name : "apiforsnmp".to_owned(),
metrics : metrics,
}
}
@ -323,21 +315,14 @@ pub mod v3 {
#[derive(Serialize, Deserialize, Debug)]
pub struct PrometheusMetricsExtended {
pub service_name: String,
pub endpoint_name: String,
pub metrics: Vec<MetricOutputExtended>,
}
impl PrometheusMetricsExtended {
pub fn new_empty_jitter() -> Self {
Self {
service_name : "zvks".to_owned(),
metrics : Vec::new(),
}
}
pub fn add(&mut self, metric: MetricOutputExtended) {
self.metrics.push(metric);
}
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
Self {
service_name : "zvks".to_owned(),
endpoint_name : "apiforsnmp".to_owned(),
metrics : metrics,
}
}
@ -380,18 +365,18 @@ pub mod enode_monitoring {
pub struct Query {
id : Vec<String>,
data : Data,
// #[serde(rename = "postQuery")]
// post_query : String,
#[serde(rename = "postQuery")]
post_query : String,
#[serde(rename = "enableActions")]
enable_actions : bool,
ts : usize
}
impl Query {
pub fn device_oriented(devices: Vec<String>) -> Self {
impl Default for Query {
fn default() -> Self {
Self {
id : devices,
id : vec!["/measures/device$18".to_owned()],
data : Data::default(),
// post_query : "links".to_owned(),
post_query : "links".to_owned(),
enable_actions : false,
ts : 1740060679399
}
@ -419,7 +404,7 @@ pub mod enode_monitoring {
fn default() -> Self {
Self {
flatten : true,
filter : Filter::default(),
filter : Filter::default()
}
}
}
@ -489,9 +474,14 @@ pub mod enode_monitoring {
pub fn get_chunk_size(total_measures: usize) -> usize {
match total_measures {
0..=432 => total_measures,
433..=1008 => total_measures / 2,
_ => total_measures / 4,
0..=144 => total_measures,
145..=288 => total_measures / 4,
289..=432 => total_measures / 5,
433..=576 => total_measures / 6,
577..=720 => total_measures / 7,
721..=864 => total_measures / 8,
865..=1008 => total_measures / 9,
_ => total_measures / 10,
}
}