Compare commits

...

94 Commits

Author SHA1 Message Date
deployer3000 db810c178e Merge pull request 'rc' (#53) from rc into master 2025-07-04 15:18:24 +03:00
Vladislav Drozdov 3657720074 Merge pull request 'feature/device-out' (#51) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/51
2025-06-25 17:22:49 +03:00
prplV 0ba5ff0e9f docs ready
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-25 10:19:42 -04:00
prplV 24e486410e fixed Dockerfile 2025-06-25 09:43:15 -04:00
Vladislav Drozdov e72310f8ab Merge pull request 'for buildings' (#49) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master There was a failure building this commit Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/49
2025-06-23 10:36:03 +03:00
prplV 5c62677ca5 for buildings
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-23 03:25:23 -04:00
Vladislav Drozdov e4b8cf2d54 Merge pull request 'for Jenkins test' (#48) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master There was a failure building this commit Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/48
2025-06-16 10:30:16 +03:00
prplV eaa551be50 for test
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-16 03:27:06 -04:00
Vladislav Drozdov 903ace3197 Merge pull request 'another ref' (#45) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master There was a failure building this commit Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/45
2025-06-11 15:42:40 +03:00
prplV 91acb6da23 another ref
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-11 08:41:02 -04:00
Vladislav Drozdov 0a1324b28a Merge pull request '+ api token + ref' (#43) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master There was a failure building this commit Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/43
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-06-11 13:29:45 +03:00
prplV e3761f5513 + api token + ref
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-11 05:06:46 -04:00
deployer3000 b1196d5177 Merge pull request 'rc' (#42) from rc into master
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-06 16:06:03 +03:00
Vladislav Drozdov 5de11c05ff Merge pull request 'add config delay' (#41) from feature/device-out into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/41
2025-06-06 16:01:39 +03:00
prplV 0cf7565dff add config delay
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-06-06 09:00:05 -04:00
deployer3000 fd813b04b6 Merge pull request 'rc' (#40) from rc into master 2025-05-27 12:33:36 +03:00
Vladislav Drozdov c8bf5e7cd9 Merge pull request 'feature/178' (#39) from feature/178 into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/39
2025-05-27 12:28:46 +03:00
deployer3000 04b4ffac2d Merge pull request 'Merge pull request 'feature/178' (#36) from feature/178 into rc' (#38) from rc into feature/178
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-05-27 12:18:50 +03:00
prplV 2f1de2b4f4 env vars added 2025-05-27 05:12:20 -04:00
deployer3000 dfe52ddc49 Merge pull request 'rc' (#37) from rc into master 2025-05-27 11:49:08 +03:00
Vladislav Drozdov f360b8158f Merge pull request 'feature/178' (#36) from feature/178 into rc
test-org/integration-module/pipeline/pr-master Build succeeded
test-org/integration-module/pipeline/pr-feature/178 Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/36
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-05-27 11:43:53 +03:00
prplV 66b66b966b adding tracing
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-05-27 02:43:21 -04:00
prplV 7cc7c0799a fix and finish 2025-05-26 13:28:15 -04:00
prplV af604c55a6 metrics vinteo native update 2025-05-26 09:19:57 -04:00
deployer3000 03c9a12ffa Merge pull request 'rc' (#35) from rc into master 2025-05-20 12:30:28 +03:00
Vladislav Drozdov f4b560a454 Merge pull request 'feature/160' (#34) from feature/160 into rc
test-org/integration-module/pipeline/pr-master Build started... Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/34
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-05-20 12:26:32 +03:00
prplV adb1ba4c07 refactor
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-05-20 04:39:50 -04:00
prplV 774a517def final fixes 2025-05-20 04:37:09 -04:00
prplV 3cca316978 Merge branch 'master' into feature/160 2025-05-20 03:14:26 -04:00
prplV 05b173408e -comments 2025-05-20 03:10:11 -04:00
prplV 7c7db5e510 enode big update 2025-05-20 03:09:59 -04:00
deployer3000 f943abeed2 Merge pull request 'rc' (#32) from rc into master 2025-05-16 14:20:06 +03:00
Vladislav Drozdov e60d4d08e7 Merge pull request 'feature/ecmdb-auth' (#31) from feature/ecmdb-auth into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/31
2025-05-16 14:15:43 +03:00
prplV 2430581948 new version
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-05-16 05:35:38 -04:00
prplV a0b9365120 new auth 2025-05-16 05:35:34 -04:00
deployer3000 cf2fa453db Merge pull request 'rc' (#30) from rc into master 2025-04-30 14:10:11 +03:00
Vladislav Drozdov f1868cd300 Merge pull request 'timeout added' (#29) from feature/135 into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/29
2025-04-30 14:05:47 +03:00
prplV b41090578d timeout added
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-04-30 07:03:36 -04:00
deployer3000 cd7a7c5c82 Merge pull request 'rc jitter' (#28) from rc into master 2025-04-30 13:46:28 +03:00
Vladislav Drozdov 1d8c0d5fc2 Merge pull request 'feature/135' (#27) from feature/135 into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/27
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-04-30 13:40:45 +03:00
prplV 12ee29d699 jitter finish
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-04-30 06:39:17 -04:00
prplV 0ea8213346 .env.example update
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-04-29 11:34:29 -04:00
prplV b58c58b165 redactor mut 2025-04-29 11:32:56 -04:00
prplV 97369e6452 jitter support x2 2025-04-29 11:31:10 -04:00
prplV 0d449468de jitter support 2025-04-29 11:30:56 -04:00
deployer3000 8d7dea30e9 Merge pull request 'rc' (#26) from rc into master 2025-04-29 14:28:16 +03:00
Vladislav Drozdov 4b1a136e1e Merge pull request 'device19 support + load balance' (#25) from feature/135 into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/25
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-04-29 14:23:15 +03:00
prplV f71f38c666 device19 support + load balance
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-04-29 06:24:23 -04:00
deployer3000 2e8fd1d17d Merge pull request 'rc' (#24) from rc into master 2025-04-28 16:17:55 +03:00
Vladislav Drozdov 76beff02ac Merge pull request 'or_else + ? logic to handle two-step env var check' (#23) from hotfix/1307 into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/23
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-04-28 10:25:35 +03:00
prplV fa0895122c extended log
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-04-14 05:18:25 -04:00
prplV 91ead7d1ba or_else + ? logic to handle two-step env var check 2025-04-14 05:13:00 -04:00
yuobrezkov a30fb0d834 auto versioning added 2025-04-10 12:11:54 +03:00
deployer3000 8a80cc6844 Merge pull request 'rc' (#22) from rc into master 2025-04-04 16:56:29 +03:00
YurijO dc50f4e614 Merge pull request 'feature/statuszero' (#21) from feature/statuszero into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://192.168.2.61/deployer3000/integration-module/pulls/21
2025-04-04 16:49:29 +03:00
prplV dfeb0dbfa9 status model supprt
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-04-04 09:18:43 -04:00
prplV ff68178e42 +zero status 2025-04-04 09:01:25 -04:00
deployer3000 5344242ce8 Merge pull request 'rc' (#19) from rc into master 2025-03-12 17:48:41 +03:00
YurijO 1ba496f053 Merge pull request '-kellnr' (#20) from feature/1163 into rc
test-org/integration-module/pipeline/pr-master Build succeeded
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/20
2025-03-12 17:45:04 +03:00
prplV f069a81b0d -kellnr
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-12 17:43:17 +03:00
YurijO ce1a9d287d Merge pull request 'feature/1163' (#18) from feature/1163 into rc
test-org/integration-module/pipeline/pr-master There was a failure building this commit Details
Reviewed-on: http://192.168.2.61/deployer3000/integration-module/pulls/18
Reviewed-by: DmitriyA <faleo1999@mail.ru>
Reviewed-by: YurijO <ya@ya.ru>
2025-03-12 14:59:57 +03:00
prplV 10efa07aab kellnr test 2
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-12 13:20:19 +03:00
prplV 43e24c136c urls fixed 2025-03-12 13:02:17 +03:00
prplV 0562a1637c kellnr test 2025-03-12 13:01:48 +03:00
prplV f238f2ce28 timeout for reqs added 2025-03-12 12:58:54 +03:00
deployer3000 2847a5a1e9 Merge pull request 'rc' (#17) from rc into master 2025-03-11 13:04:25 +03:00
yuobrezkov 330463ea47 Changed Jenkinsfile for notify
test-org/integration-module/pipeline/pr-master Build succeeded
2025-03-11 12:58:31 +03:00
YurijO 588f278621 Merge pull request 'Исправление работы функции получения ключа доступа и создания сессии' (#16) from hotfix/1155 into rc
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/16
Reviewed-by: DmitriyA <faleo1999@mail.ru>
Reviewed-by: YurijO <ya@ya.ru>
2025-03-11 12:56:05 +03:00
prplV fdaa505aa5 1155 fixed + controled logs
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-11 12:15:37 +03:00
prplV 5ad9f209b0 inf getting session with incresing delay
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-11 10:43:54 +03:00
deployer3000 9d2e8100df Merge pull request 'rc' (#15) from rc into master 2025-03-10 16:19:29 +03:00
YurijO 9ec918088c Merge pull request 'hotfix/1148' (#14) from hotfix/1148 into rc
test-org/integration-module/pipeline/pr-master Build started... Details
test-org/integration-module/pipeline/pr-rc This commit looks good Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/14
Reviewed-by: YurijO <ya@ya.ru>
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-03-10 16:13:36 +03:00
prplV f98c39d488 + patch in version
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-10 15:42:20 +03:00
prplV e4d58696c0 #13 fixed, sessions now recreating
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-10 15:39:22 +03:00
prplV 9de3aa1629 - debug 2025-03-10 15:38:37 +03:00
deployer3000 48285eeb40 Merge pull request 'rc' (#12) from rc into master 2025-03-07 13:02:39 +03:00
YurijO ca80c70c80 Merge pull request 'temp export tracing' (#11) from feature/report-exports into rc
test-org/integration-module/pipeline/pr-master Build started... Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/11
Reviewed-by: YurijO <ya@ya.ru>
2025-03-07 12:40:22 +03:00
prplV 67868f4438 temp export tracing
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-07 12:29:12 +03:00
Vladislav Drozdov 9e2e5896ef Merge pull request 'refactor + tests' (#10) from feature/1126 into rc
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/10
Reviewed-by: YurijO <ya@ya.ru>
2025-03-07 12:21:56 +03:00
prplV 9354085662 monitoring final doc comments
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-07 10:21:04 +03:00
prplV 1d31dc6c59 debug rework to hide creds
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-06 18:10:30 +03:00
prplV 39f901e6a6 main refactor 2025-03-06 17:53:13 +03:00
prplV 4c828c968e export docs fix 2025-03-06 17:52:57 +03:00
prplV c028699bab - useless comment 2025-03-06 11:00:17 +03:00
prplV e952a07318 + loggger doc comms
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-06 10:59:31 +03:00
prplV 267e90c972 added monitoring doc comms
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-05 16:56:46 +03:00
prplV 96dc94bf0b added json doc-comment
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-05 13:40:41 +03:00
prplV 5e8eadac9b Metric template tempo added 2025-03-05 13:32:55 +03:00
prplV f6ad632c2d desctruct info for exporter 2025-03-05 13:26:56 +03:00
prplV ba82d59c74 added export doc-comments
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-05 13:17:08 +03:00
prplV 9499c97ce6 added config doc-comments
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-05 12:52:16 +03:00
deployer3000 68bee74756 Merge pull request 'rc' (#9) from rc into master 2025-03-04 16:44:57 +03:00
Vladislav Drozdov 8e32de3be3 Merge pull request 'feature/1117' (#8) from feature/1117 into rc
test-org/integration-module/pipeline/pr-master Build started... Details
test-org/integration-module/pipeline/pr-rc This commit looks good Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/8
2025-03-04 15:49:22 +03:00
deployer3000 c6e24cae42 Merge pull request 'rc' (#6) from rc into master 2025-02-28 17:05:59 +03:00
16 changed files with 1238 additions and 185 deletions

4
.dockerignore Normal file
View File

@ -0,0 +1,4 @@
/target
Cargo.lock
*.sock
.env

View File

@ -1,15 +1,47 @@
# Template .env for API grabber
# PostgreSQL connection [DEPRECATED]
# -------------------------------
DB_HOST = "ip.addr.postgresql.server"
DB_USER = "db_user"
DB_PASSWORD = "db_user_password"
DB_DBNAME = "db_name"1
# Prometheus-Exporter info
# -------------------------------
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
# VINTEO Jitter puller (needed to init Jitter native grab)
# -------------------------------
VINTEO_URL_BASE = "http(s)://ip.ip.ip.ip:port"
VINTEO_ENDPOINT_CONFERENCES = "/api/v1/to/something"
VINTEO_ENDPOINT_PARTICIPANTS = "/api/v1/to/something"
VINTEO_API_KEY = "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"
# Status Model API support
# > if exists, ignore `EXPORTER_URL` var
STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input"
# eNODE.Monitoring configuration
# -------------------------------
# eNODE.Monitoring server IP
ENODE_MONITORING_IP = "ip.ip.ip.ip"
# eNODE.Monitoring credentials
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
# List of target devices
ENODE_TARGET_DEVICES = "18, 19"
# to work with unlimit API-Token
ENODE_API_TOKEN = "sssswwwwaaaaffff"
# OPTIONAL SETTINGS
# -------------------------------
# IM configuration for max level of logging info
# for example DEBUG, INFO, WARN, ERROR, TRACE
IM_LOG_INFO = "INFO"
# IM configuration for setting up API connetion
# timeout (in secs). Default value - 10
IM_CONNECTION_TIMEOUT = "10"
# IM configuration for delay of requests
# delay (in secs). Default value - 5
IM_REQUEST_DELAY = "20"

View File

@ -5,7 +5,11 @@ RUN apt update && apt install -y musl-tools
RUN rustup target add x86_64-unknown-linux-musl
COPY . .
RUN cargo test
ENV CARGO_HTTP_DEBUG=true
ENV CARGO_HTTP_TIMEOUT=100
RUN cargo test --verbose
RUN cargo build --release --target=x86_64-unknown-linux-musl
FROM alpine:latest

56
Jenkinsfile vendored
View File

@ -1,3 +1,24 @@
def notify(
String context,
String giteaUser,
String giteaPass,
String repositoryUrl,
String repositoryName,
String commitHash,
String buildStatus
) {
def status = buildStatus == 'success' ? 'success' : 'failure'
def description = buildStatus == 'success' ? 'Build succeeded' : 'Build failed'
sh """
curl -X POST \
-u "${giteaUser}:${giteaPass}" \
-H "Content-Type: application/json" \
-d '{"context":"${context}","state": "${status}", "description": "${description}"}' \
${repositoryUrl}deployer3000/${repositoryName}/statuses/${commitHash}
"""
}
pipeline {
agent any
environment {
@ -13,7 +34,22 @@ pipeline {
}
steps {
script {
env.IMAGE_TAG = sh(script: "git describe --tags --abbrev=0", returnStdout: true).trim()
def hasTags = sh(script: "git tag -l | wc -l", returnStdout: true).trim().toInteger() > 0
def lastVersion = "0.0.0"
if (hasTags) {
lastVersion = sh(script: "git describe --tags --abbrev=0", returnStdout: true).trim()
}
echo "Last version: ${lastVersion}"
def (major, minor, patch) = lastVersion.tokenize('.')
def newVersion = "${major}.${minor}.${patch.toInteger() + 1}"
echo "New version: ${newVersion}"
env.IMAGE_TAG = newVersion
env.NEW_VERSION = newVersion
}
}
}
@ -58,14 +94,30 @@ pipeline {
echo "Attempting to merge PR ${env.CHANGE_ID} into master..."
withCredentials([usernamePassword(credentialsId: 'gitea_creds', usernameVariable: 'GITEA_USER', passwordVariable: 'GITEA_PASS')]) {
def prId = env.CHANGE_ID
sh """
curl -X POST \
-u "${GITEA_USER}:${GITEA_PASS}" \
-H "Content-Type: application/json" \
-d '{"do":"merge"}' \
http://git.entcor/api/v1/repos/deployer3000/integration-module/pulls/${prId}/merge
http://git.entcor/api/v1/repos/deployer3000/${env.IMAGE_NAME}/pulls/${prId}/merge
"""
def commitHash = sh(script: "git rev-parse HEAD~1", returnStdout: true).trim() // необходим для корректного отображения статусов
echo "PR ${prId} merged successfully into master!"
sleep(time: 15, unit: 'SECONDS')
sh "git checkout master && git pull origin master"
sh """
curl -v -X POST -u "${GITEA_USER}:${GITEA_PASS}" \
-H "Content-Type: application/json" \
-d '{"tag_name": "${env.NEW_VERSION}", "name": "Release ${env.NEW_VERSION}", "target_commitish": "master"}' \
"${env.GITEA_REPOSITORY_URL}deployer3000/${env.IMAGE_NAME}/releases"
"""
echo "New release succeeded!"
def context = "test-org/${env.IMAGE_NAME}/pipeline/pr-${env.CHANGE_TARGET}"
notify(context, GITEA_USER, GITEA_PASS, env.GITEA_REPOSITORY_URL, env.IMAGE_NAME, commitHash, "success")
}
}
}

166
README.md
View File

@ -1,39 +1,173 @@
# Интеграционный модуль для проекта "Буревестник ВКС"
## Описание
`integr_mod` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
`Интеграционный модуль (ИМ)` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
- получение данных через API ВКС
- поддержку хранения, валидации и актуализации собственных конфигураций
- предобработку полученных данных и ~~отправку `Системе Мониторинга`~~ сохранение в БД
- предобработку полученных данных и сохранение в БД
- интеграции с `еНОД.Мониторинг`
## Специфика работы
На даннный момент предусмотрено два режима работы:
1) **Нативный** - режим работы, производящий прямой опрос сервиса видео-конференц связи `Vinteo` и соотвествующий процесс `ETL`
2) **Статичный** - режим работы *"посредник"*, когда все метрические данные ВКС `Vinteo` получаются через `REST-Full API` средства `еНОД.Мониторинг`
3) **Системный** - аналогичный **статичному** режиму, но метрические данные (заведомо обогащенные нулевым статусом) отправялются не напрямую в модуль `Prometehus exporter`, а в `Статусную модель`
4) **Vinteo** - особый режим работы, предполагающий сбор определенного набора метрик напрямую с ВКС `Vinteo` механизмом многоэтапного `API-запроса`
> **Примечание**
По стандарту `ИМ` работает в **НАТИВНОМ** режиме и ожидает конфигурации в формате `.json`, однако приоритетным считается **СТАТИЧНЫЙ** режим. Подробная информация о настройке в пункте `Руководство`
## Руководство
1. Заполнить .env файл или установить переменные окружения в соотвествии с примером в `.env.example` файле
``` toml
# Template .env for API grabber
В данном разделе опсиан алгоритм настройки, сборки и запуска программного модуля `ИМ`
# Prometheus-Exporter info
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
### Преднастройка
# eNODE.Monitoring configuration
ENODE_MONITORING_IP = "ip.ip.ip.ip"
# admin user is required
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"
# admin password is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
1. Выбор режима работы модуля, который скорректирует принцип настройки:
| Режим работы | .env | config-api.json | $STATUS_SYSTEM_URL | $EXPORTER_URL |
|---|---|---|---|---|
| Нативный | ❌ | ✅ | ❌ | ❌ |
| Статичный | ✅ | ❌ | ❌❌❌ | ✅ |
| Системный | ✅ | ❌ | ✅ | ❌ |
, где:
✅ -- следует настроить (предпринять)
❌ -- игнорируется системой, не стоит настраивать
❌❌❌ -- **НЕЛЬЗЯ** настраивать (предпринимать), возможны ошибки в работе
> Режим работы `Vinteo` *не описан* в таблице **намеренно**
### Настройка режима работы "Нативный"
Для настройки данного режима необходимо расположить в **активной** директории конфигурационный `config_api.json` файл:
``` json
{
"config": [
{
"id":"zvks",
"login" : "",
"pass" : "",
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
"period" : "",
"timeout" : "5",
"metrics" : [
{
"name": "conferences",
"url": "https://demo.vcs.vinteo.dev/api/v1/conferences",
"measure": [
{ "id":"number", "type": "text", "addr": "data.conferences[].number" },
{ "id":"total", "type": "integer", "addr": "data.total" },
{ "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" },
{ "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" },
{ "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" }
]
},
{
"name": "abonents",
"url": "https://demo.vcs.vinteo.dev/api/v1/accounts",
"measure": [
{ "id":"total", "type": "integer", "addr": "data.total" }
]
}
]
}
]
}
```
> **Примечание**
Название конфигурационного файла должно быть как в примере - `config_api.json`
### Настройка режима работы "Статичный"
Для настройки данного режима необходимо пополнить данными о сервере в `.env` файле по примеру:
``` toml
...
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port" # <--- экспорт данных (обязательно)
# eNODE.Monitoring configuration
ENODE_MONITORING_IP = "ip.ip.ip.ip"
# admin user is required
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"# ---> получение данных
# admin password is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
...
```
### Настройка режима работы "Системный"
Для настройки данного режима необходимо пополнить данными о сервере в `.env` файле по примеру:
``` toml
...
STATUS_SYSTEM_URL = "http(s)://{ip}:{port}/api/input"# <--- экспорт данных
# eNODE.Monitoring configuration
ENODE_MONITORING_IP = "{ip}.{ip}.{ip}.{ip}"
# admin user is required
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"# ---> получение данных
# admin password is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
...
```
### Настройка режима работы "Vinteo"
Для работы в данном режиме необходимо установить переменные окружения в соотвествии со списком ниже
``` toml
...
VINTEO_URL_BASE = "https://demo.vcs.vinteo.dev"
VINTEO_ENDPOINT_CONFERENCES = "/api/v1/conferences"
VINTEO_ENDPOINT_PARTICIPANTS = "/api/v1/participants/"
VINTEO_API_KEY = "00000000000111111111.aaaaaaaaaaaaaaabbbbbbbbbbbbb"
...
```
### Настройка экспорта полученных и обработанных данных
Настройка *точки выхода* для полученных и обработанных метрик определеяется установленными в переменных окружения параметрами, варианта два:
1) **Экспорт в статусную модель** в рамках механизма сквозного прохода данных в проекте `Буревестник ВКС`
``` toml
...
STATUS_SYSTEM_URL = "{BASE_URL}/{ROUTE}"
...
```
2) **Экспорт в экспортер или иной потребитель данных**
``` toml
...
EXPORTER_URL = "{BASE_URL}/{ROUTE}"
...
```
> ### **ОЧЕНЬ ВАЖНОЕ ПРИМЕЧАНИЕ**
> ---
> Одновременное использование `$STATUS_SYSTEM_URL` и `$EXPORTER_URL` **НЕДОПУСТИМО** !! Вариант со ссылкой **на статусную модель** является _по стандарту_ **БОЛЕЕ ПРИОРИТЕТНЫМ**, второй затрется, использовать необходимо только один
2. Произвести сборку проекта командой :
``` bash
cargo build --release
```
3. Запустить
> Debug версия
> **Debug** версия
``` bash
cargo run --bin api-grub
```
или
> Release версия
> **Release** версия
``` bash
cargo run --release --bin api-grub
```
@ -42,6 +176,6 @@ cargo run --release --bin api-grub
| Крейт (подмодуль) | Прогресс |
|---|---|
|`api-grub` | ✅✅✅✅✅✅✅✅✅🛠️ |
|`config-delivery [migrated]` | ❌❌❌❌❌❌❌❌❌❌ |
|`config-delivery` [migrated] | ❌❌❌❌❌❌❌❌❌❌ |
|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ |
|`preproc` [temp-deprecated] | ❌❌❌❌❌❌❌❌❌❌ | (разработка временно остановлена)

View File

@ -1,15 +1,20 @@
[package]
name = "api-grub"
version = "1.0.1"
version = "1.0.15"
edition = "2021"
authors = ["Vladislav Drozdov <maseeeeeeeed@gmail.com>"]
description = "API poller for ZVKS project"
homepage = "http://git.enode/deployer3000/integration-module/src/branch/master/crates/api-grub"
repository = "http://git.enode/deployer3000/integration-module/src/branch/master/crates/api-grub"
license = "MIT OR Apache-2.0"
keywords = ["api", "grub", "zvks"]
publish = ["kellnr"]
[dependencies]
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.135"
tokio = { version = "1.43.0", features = ["full"] }
integr-structs = {path = "../integr-structs"}
env_logger = "0.11.6"
log = "0.4.25"
integr-structs = { version = ">=0.1.0", path="../integr-structs"}
anyhow = "1.0.95"
chrono = "0.4.39"
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
@ -20,3 +25,8 @@ md5 = "0.7.0"
rand = "0.9.0"
sysinfo = "0.33.1"
openssl = { version = "0.10", features = ["vendored"] }
tracing-subscriber = "0.3.19"
tracing = "0.1.41"
lazy_static = "1.5.0"
futures = "0.3.31"
async-stream = "0.3.6"

View File

@ -2,7 +2,7 @@
// 1) check changes in unix-socket
// 2) save changes in local config file
use anyhow::{Error, Ok, Result};
use log::{info, warn, error};
use tracing::{info, warn, error};
use std::{fs, path::Path};
use serde_json::from_str;
use tokio::{io::AsyncReadExt, net::UnixListener};
@ -15,6 +15,20 @@ const CONFIG_PATH: &str = "config_api.json";
const SOCKET_PATH: &str = "api-grub.sock";
// TODO: rewrite to use current_exe
/// # Fn `pull_local_config`
///
/// ## function to one-time pulling local config by straight reading
///
/// ### Dev-Info :
///
/// *input* : -
///
/// *output* : `anyhow::Result<integr_structs::api::v3::Config>`
///
/// *initiator* : fn `main`
///
/// *managing* : -
///
pub async fn pull_local_config() -> Result<Config> {
let path = Path::new(CONFIG_PATH);
if path.exists() && path.is_file() {
@ -28,6 +42,22 @@ pub async fn pull_local_config() -> Result<Config> {
}
// for config pulling
/// # Fn `pull_local_config`
///
/// ## function to init Unix-Socket listening for grabbing new configs
///
/// ### Dev-Info :
///
/// *input* : `&tokio::sync::mpsc::Sender<integr_structs::api::v3::Config>`
///
/// *output* : `anyhow::Result<()>`
///
/// *initiator* : fn `main`
///
/// *managing* : -
///
/// *depends on* : `const SOCKET_PATH`
///
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
info!("Initializing Unix-Socket listening for pulling new configs...");
let server = init_unix_listener().await?;
@ -57,12 +87,43 @@ pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
}
// saving new config locally
/// # Fn `save_new_config`
///
/// ## function for saving new config locally
///
/// ### Dev-Info :
///
/// *input* : `&String | &str`
///
/// *output* : `anyhow::Result<()>`
///
/// *initiator* : fn `main`
///
/// *managing* : -
///
/// *depends on* : `const CONFIG_PATH`
///
async fn save_new_config(config: &String) -> Result<()> {
fs::write(CONFIG_PATH, config)?;
Ok(())
}
/// # Fn `pull_local_config`
///
/// ## function for saving new config locally
///
/// ### Dev-Info :
///
/// *input* : `&tokio::sync::mpsc::Sender<Config>`
///
/// *output* : `anyhow::Result<tokio::net::UnixListener>`
///
/// *initiator* : fn `init_config_grub_mechanism`
///
/// *managing* : -
///
/// *depends on* : `const SOCKET_PATH`
///
async fn init_unix_listener() -> Result<UnixListener> {
let _ = fs::remove_file(SOCKET_PATH);
Ok(UnixListener::bind(SOCKET_PATH)?)

View File

@ -4,13 +4,45 @@ use reqwest::Client;
use tokio_postgres::NoTls;
use std::env;
use anyhow::Result;
use log::{info, error};
use tracing::{debug, error, info, trace};
use std::ops::Drop;
/// An entity which handles DB connections.
///
/// This struct is used to be a layer between API-grab workers
/// and the `pool` of DB connections to save grabbing processes statuses
/// (now `PostgreSQL`, slowly migrating to `ClickHouse` support)
///
/// # Examples
///
/// ```
/// use api-grub::export::Expoter;
///
/// let exporter = Expoter::init();
///
/// assert!(exporter.get_connection_from_pool().is_some());
/// ```
///
/// # Hint:
///
/// - use `export_data` method to export metrics to DB
///
/// - use `export_metrics` method to export metrics (`PrometehusMetrics`
/// type) to the Prometehus exporter (`$EXPORTER_URL`)
///
/// - use `export_metrics_extended` method to export metrics (
/// `*PrometehusMetricsExtended*` type with `desc` field) to the
/// Prometehus exporter (`$EXPORTER_URL`)
pub struct Exporter {
pool : Option<Pool>,
}
impl Exporter {
/// Fills `deadpool_postgres::Config` object with values from ENV VARS:
/// - `DB_HOST`
/// - `DB_DBNAME`
/// - `DB_USER`
/// - `DB_PASSWORD`
fn config_construct() -> Result<Config> {
let mut cfg = Config::new();
cfg.host = Some(env::var("DB_HOST")?);
@ -19,6 +51,9 @@ impl Exporter {
cfg.password = Some(env::var("DB_PASSWORD")?);
Ok(cfg)
}
/// Uses `deadpool_postgres::Config` object to create DB connections
/// pool to share between async tasks and to restrict a count of parallel
/// connections
fn pool_construct() -> Option<Pool> {
return match Self::config_construct() {
Ok(config) => {
@ -34,6 +69,7 @@ impl Exporter {
},
}
}
/// Checks if DB connections pool is empty
#[allow(unused)]
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
pub fn init() -> Self {
@ -41,6 +77,9 @@ impl Exporter {
pool : Self::pool_construct(),
}
}
/// Shares a connection `deadpool_postgres::Client as PgClient`
///
/// Function awaits til the moment it can return `Option<deadpool_postgres::Client as PgClient>`
#[allow(unused)]
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
if let Some(pool) = &self.pool {
@ -48,15 +87,22 @@ impl Exporter {
}
None
}
/// Exports data in `&str` jsonb format to DB using connection from the pool
#[allow(unused)]
#[tracing::instrument(name = "PostgreSQL export")]
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?;
Ok(())
}
/// Exports metrics in `PrometheusMetrics` format to Exporter defined
/// as env var $EXORPTER_URL
#[tracing::instrument(name = "Prometheus export")]
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
let url = env::var("EXPORTER_URL")?;
debug!("Exporting: {:?}", &metrics);
let req = Client::new()
.post(url)
.json(&metrics)
@ -65,14 +111,31 @@ impl Exporter {
req?;
Ok(metrics.get_bytes_len())
}
/// Exports metrics in `PrometheusMetricsExtended` format to Exporter defined
/// as env var $EXORPTER_URL
#[tracing::instrument(name = "Prometheus/Status System export")]
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
let url = env::var("EXPORTER_URL")?;
// let url = env::var("EXPORTER_URL")?;
let url = env::var("STATUS_SYSTEM_URL").or_else(|err| {
trace!("cannot fetch $STATUS_SYSTEM_URL var due to {}. working only with Prometheus exporter link", err);
env::var("EXPORTER_URL")
})?;
debug!("Exporting: {:?}", &metrics);
let req = Client::new()
.post(url)
.post(&url)
.json(&metrics)
.send().await;
req?;
Ok(metrics.get_bytes_len())
}
}
impl Drop for Exporter {
// Custom destructor to log deinitializing of the `Exporter`
fn drop(&mut self) {
info!("Deinitializng Exporter and DB connection pool ...")
}
}

View File

@ -0,0 +1,319 @@
use std::collections::{HashMap, HashSet};
use integr_structs::api::v3::{PrometheusMetricsExtended, MetricOutputExtended};
use std::sync::Arc;
use reqwest::Client;
use lazy_static::lazy_static;
use tracing::{error, info, trace, warn};
use std::pin::Pin;
use serde_json::{Value, from_str};
use crate::export::Exporter;
use async_stream::stream;
use futures::{future, pin_mut, stream::Stream, StreamExt};
lazy_static! {
static ref VINTEO_BASE: String = std::env::var("VINTEO_URL_BASE").unwrap_or_else(|_| String::from("https://demo.vcs.vinteo.dev"));
static ref CONFERENCES_ENDPOINT: String = std::env::var("VINTEO_ENDPOINT_CONFERENCES").unwrap_or_else(|_| String::from("/api/v1/conferences"));
static ref USERS_ENDPOINT: String = std::env::var("VINTEO_ENDPOINT_PARTICIPANTS").unwrap_or_else(|_| String::from("/api/v1/participants/"));
}
// conferences ids
type Conferences = HashSet<(String, String)>;
type OutputConferences = HashMap<(String, String), Value>;
#[derive(Debug, Clone)]
struct ConferencesDigital(OutputConferences);
impl ConferencesDigital {
pub fn new(item: OutputConferences) -> Self {
Self(item)
}
pub fn go_across(self) -> impl Stream<Item = ((String, String), Arc<Value>)> {
stream! {
for (key, value) in self.0 {
info!("Conference {} ({}) is processing ...", key.0, key.1);
yield (key, Arc::new(value));
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
}
// to handle async http requests
#[derive(Debug)]
struct Requester {
base: String,
api_key: String,
client: Client,
}
impl Requester {
pub fn new() -> anyhow::Result<Self> {
Ok(Self {
base: VINTEO_BASE.clone().to_owned(),
api_key: std::env::var("VINTEO_API_KEY")?,
client: Client::builder().user_agent("api-grub").build()?,
})
}
pub async fn get_conferences(&self) -> anyhow::Result<Value> {
trace!("getting conferences list from API");
let url = format!("{}{}", self.base, *CONFERENCES_ENDPOINT);
let req = self.client.get(url)
.timeout(tokio::time::Duration::from_secs(10))
.header("accept", "application/json")
.header("x-api-key", &self.api_key);
let resp = req.send().await;
Ok(from_str(&resp?.text().await?)?)
}
pub async fn get_conferences_from_value(conferences: Value) -> anyhow::Result<Conferences> {
trace!("extracting conferences list");
let mut hashset = Conferences::new();
match conferences.get("data") {
None => return Err(anyhow::Error::msg("Invalid JSON format: no `data` field")),
Some(data) => {
match data.get("conferences") {
None => return Err(anyhow::Error::msg("Invalid JSON format: no `conferences` field")),
Some(confs) => {
if let Some(confs) = confs.as_array() {
confs.iter()
.for_each(|conf_obj| {
if let Some(id) = conf_obj.get("number") {
if let Some(id) = id.as_str() {
let id = {
if let Some(desc) = conf_obj.get("description") {
match desc.as_str() {
Some(desc) => (desc.to_string(), id.to_string()),
None => (String::new(), id.to_string())
}
} else {
(String::new(), id.to_string())
}
};
hashset.insert(id);
}
}
});
} else {
return Err(anyhow::Error::msg("Invalid JSON format: `conferences` is not an array"))
}
},
}
},
}
Ok(hashset)
}
pub async fn get_users_jitter_by_conference(&self, conferences : Conferences) -> anyhow::Result<OutputConferences> {
trace!("getting users list for each conference from API ...");
let mut output = OutputConferences::new();
for conf in conferences {
let url = format!("{}{}{}", self.base, *USERS_ENDPOINT, &conf.1);
let req = self.client.get(url)
.timeout(tokio::time::Duration::from_secs(10))
.header("accept", "application/json")
.header("x-api-key", &self.api_key);
match req.send().await {
Err(er) => error!("Cannot GET data about conference - {:?}: {}", conf, er),
Ok(resp) => {
match resp.text().await {
Err(er) => error!("Cannot convert response into text: {}", er),
Ok(resp) => {
let resp: Value = from_str(&resp)?;
output.entry(conf).or_insert(resp);
}
}
},
}
}
Ok(output)
}
}
fn across_participants(conf: &Value) -> impl Stream<Item = Arc<Value>> + '_ {
trace!("fetching participants for conf and going across ...");
let participants_iter = conf
.get("data")
.and_then(|data| data.get("participants"))
.and_then(|participants| participants.as_array())
.into_iter()
.flatten()
.cloned();
stream! {
for participant in participants_iter {
yield Arc::new(participant);
}
}
}
async fn gather_futures(parts: Arc<Value>, conf_id: Arc<str>, conf_desc: Arc<str>, targets: Vec<(&str, &str)>) -> Vec<Option<MetricOutputExtended>> {
trace!("extracting {} measures for current participant in current conference {} ...", targets.len(), conf_id);
let mut futures: Vec<Pin<Box<dyn futures::Future<Output = Option<MetricOutputExtended>> + Send>>> = Vec::new();
for (target, description) in targets {
futures.push(Box::pin(extract_from_participant(
parts.clone(),
target,
conf_id.clone(),
conf_desc.clone(),
description
)));
}
future::join_all(futures).await
}
// GET participants/{conference_id} data.total
async fn extract_total_participants(conferences : Arc<Value>) -> Value {
if let Some(total) = conferences
.get("data")
.and_then(|data| data.get("total")) {
return total.clone()
}
Value::Null
}
async fn extract_total_anon_participants(conferences : Arc<Value>) -> Value {
let sum = conferences
.get("data")
.and_then(|data| data.get("participants"))
.and_then(|parts| parts.as_array())
.iter()
.flat_map(|&val| val.iter())
.filter_map(|part| part.get("isAnonymous"))
.filter_map(|is_anon| is_anon.as_bool())
.filter(|is_anon| *is_anon == true)
.count();
Value::Number(sum.into())
}
// GET participants/{conference_id} data.participants[].isAnonymous
async fn extract_from_participant(participant : Arc<Value>, target: &str, conf_id: Arc<str>, conf_desc: Arc<str>, description: &str) -> Option<MetricOutputExtended> {
trace!("extracting `{}` measure for current participant ...", target);
if let Some(value) = participant.get("params")
.and_then(|params| params.get(target)) {
let name = participant.get("watermark").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
let id = participant.get("id").unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "unknown");
let metric_name = format!("{}_{}_{}", target, conf_id, id);
return Some(
MetricOutputExtended::new_with_slices(
&metric_name,
&metric_name,
"type",
"Vinteo native",
format!(
"{} для пользователя {} ({}) в конференции {} ({})",
description,
name,
id,
conf_desc,
conf_id
).as_ref(),
Some(18),
Some("module$12".to_string()),
value.clone()
)
)
}
None
}
pub async fn init_grubbing_jitter() -> anyhow::Result<()> {
// create requester
let requester = Requester::new()?;
// get conference id's
loop {
info!("Getting conferences array ...");
let conferences = requester.get_conferences().await?;
let conferences = Requester::get_conferences_from_value(conferences).await?;
info!("Getting jitter metrics by user in each conference ...");
let confs = requester.get_users_jitter_by_conference(conferences).await?;
info!("Initializing extraction mechanism ...");
let conferences = ConferencesDigital::new(confs);
let extracted_conference = conferences.go_across();
let mut buffer: Vec<MetricOutputExtended> = Vec::new();
pin_mut!(extracted_conference);
while let Some(((desc, id), item)) = extracted_conference.next().await {
let parts_stream = across_participants(&item);
pin_mut!(parts_stream);
let total_participants = MetricOutputExtended::new_with_slices(
format!("TotalParticipants_{}", id).as_ref(),
format!("TotalParticipants_{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество участников в конференции {}", &desc).as_ref(),
Some(18),
Some(String::from("module$12")),
extract_total_participants(item.clone()).await
);
// anon NON_STABLE
let total_anon_participants = MetricOutputExtended::new_with_slices(
format!("TotalAnonymousParticipants_{}", id).as_ref(),
format!("TotalAnonymousParticipants_{}", id).as_ref(),
"type",
"Vinteo Native",
format!("Общее количество анонимных участников в конференции {}", &desc).as_ref(),
Some(18),
Some(String::from("module$12")),
extract_total_anon_participants(item.clone()).await
);
// description
buffer.push(total_participants);
buffer.push(total_anon_participants);
while let Some(participant) = parts_stream.next().await {
buffer.append(
&mut gather_futures(
participant,
Arc::from(id.as_ref()),
Arc::from(desc.as_ref()),
vec![
("txBitrate", "Скорость отправки данных"),
("rxBitrate", "Скорость приёма данных"),
("txFPS", "Количество отправляемых кадров в секунду"),
("rxFPS", "Количество получаемых кадров в секунду"),
("rxLost", "Количество потерянных пакетов на приёме"),
("txLost", "Количество потерянных пакетов на отправке"),
("jBLen", "Фазовое отклонение на цифровом канале"),
("txLostPercent", "Процент потерянных пакетов на отправке"),
("rxLostPercent", "Процент потерянных пакетов на приёме"),
("txH239Bitrate", "Скорость передачи данных в дополнительном видео-потоке"),
("rxH239Bitrate", "Скорость приёма данных в дополнительном видео-потоке"),
("txVideoCodec", "Используемый видеокодек на отправке"),
("rxVideoCodec", "Используемый видеокодек на приёме"),
("txAudioCodec", "Используемый аудиокодек на отправке"),
("rxAudioCodec", "Используемый аудиокодек на приёме"),
("txResolution", "Разрешение передаваемого видео"),
("rxResolution", "Разрешение принимаемого видео"),
("txH239Resolution", "Разрешение дополнительного видео-потока"),
("rxH239Resolution", "Разрешение дополнительного видео-потока на принимающей стороне"),
("duration", "Длительсность сеанса (в минутах)"),
],
).await
.into_iter()
.filter(|metric| metric.is_some())
.map(|metric| metric.unwrap() )
.collect::<Vec<MetricOutputExtended>>()
);
}
}
// let metrics = Requester::get_metrics_from_jitter(jitter).await;
if !buffer.is_empty() {
let metrics = PrometheusMetricsExtended::new_zvks(buffer).await;
match Exporter::export_extended_metrics(metrics).await {
Ok(bytes) => info!("Successfully transmitted metrics ({} bytes)", bytes),
Err(er) => error!("Cannot export data: {}", er),
}
} else {
warn!("Metrics array is empty. Ignoring exporting ...");
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
// get users' jitter info
}

View File

@ -2,6 +2,21 @@
use serde_json::{json, Value};
use integr_structs::api::v3::{Metric, MetricOutput};
/// A JSON-parser struct
///
/// Using in metric extracting from Server Response
/// with metrics mechanism
///
/// # Example
///
/// ```
/// use api-grub::json::JsonParser;
/// use use integr_structs::api::v3::Metric;
///
/// let json = b""flat1" : { "room1" : { "rt_tempo" : "+16" }}".to_vec();
///
/// assert!(!JsonParser::parse(vec![Metric::template()], json).is_empty());
/// ```
pub struct JsonParser;
impl JsonParser {

View File

@ -1,25 +1,38 @@
use chrono::Local;
use env_logger::Builder;
use log::LevelFilter;
use std::io::Write;
use std::str::FromStr;
use anyhow::Result;
use log::info;
use tracing::info;
/// # Fn `setup_logger`
///
/// ## function to init terminal logger
///
/// ### Dev-Info :
///
/// *input* : -
///
/// *output* : `anyhow::Result<()>`
///
/// *initiator* : fn `main`
///
/// *managing* : -
///
/// *depends on* : -
///
pub async fn setup_logger() -> Result<()> {
Builder::new()
.format(move |buf, record| {
writeln!(
buf,
"|{}| {} [{}] - {}",
"api-grubber",
Local::now().format("%d-%m-%Y %H:%M:%S"),
record.level(),
record.args(),
)
})
.filter(None, LevelFilter::Info)
.target(env_logger::Target::Stdout)
.init();
let log_level = std::env::var("IM_LOG_INFO").unwrap_or_else(|_| String::from("INFO"));
tracing_subscriber::fmt()
.with_max_level(
tracing::Level::from_str(&log_level)
.unwrap_or_else(|_| tracing::Level::INFO))
.with_writer(std::io::stdout)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::NEW)
// .with_timer(Local::now().format("%d-%m-%Y %H:%M:%S"))
.with_line_number(false)
.with_target(false)
.with_file(false)
.compact()
.init();
info!("Logger configured");
Ok(())
@ -29,22 +42,17 @@ pub async fn setup_logger() -> Result<()> {
#[cfg(test)]
mod logger_unittests {
use tokio::test;
use super::*;
#[test]
async fn check_logger_builder() {
Builder::new()
.format(move |buf, record| {
writeln!(
buf,
"|{}| {} [{}] - {}",
"api-grubber",
Local::now().format("%d-%m-%Y %H:%M:%S"),
record.level(),
record.args(),
)
})
.filter(None, LevelFilter::Info)
.target(env_logger::Target::Stdout)
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::NEW)
// .with_timer(Local::now().format("%d-%m-%Y %H:%M:%S"))
.with_line_number(false)
.with_target(false)
.with_file(false)
.compact()
.init();
}
}

View File

@ -4,17 +4,19 @@ mod logger;
mod json;
mod export;
mod monitoring;
mod jitter;
use anyhow::Result;
// use integr_structs::api::ApiConfigV2;
use integr_structs::api::v3::Config;
use logger::setup_logger;
// use log::{info, warn};
use config::{pull_local_config, init_config_grub_mechanism};
use net::init_api_grub_mechanism;
use tokio::sync::mpsc;
use log::{error, info, warn};
use tracing::{error, info, warn};
// ENODE_MONITORING_IP
use monitoring::get_metrics_from_monitoring;
// VINTEO_API_KEY
use jitter::init_grubbing_jitter;
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()>{
@ -23,10 +25,6 @@ async fn main() -> Result<()>{
let config = get_config().await;
// config update channel
let (tx, mut rx) = mpsc::channel::<Config>(1);
// futures
// todo : rewrite with spawn
// let config_fut = init_config_grub_mechanism(&tx);
// let grub_fut = init_api_grub_mechanism(config, &mut rx);
let event_config = tokio::spawn(async move {
match init_config_grub_mechanism(&tx).await {
@ -38,24 +36,30 @@ async fn main() -> Result<()>{
},
}
});
let request_delay = std::env::var("IM_REQUEST_DELAY")
.unwrap_or_else(|_| String::from("5"))
.parse::<u32>()
.unwrap_or_else(|_| {
warn!("No delay was set, setting up as 5 secs ..."); 5
});
let event_grub = tokio::spawn(async move {
// GRAB USING eNODE.MONITORING API GATEWAY
if std::env::var("ENODE_MONITORING_IP").is_ok() {
match get_metrics_from_monitoring(0, 5).await {
Ok(_) => {
info!("Grabing (eNODE.Monitoring) task deinitialized");
},
Err(er) => {
error!("Grabing task returned an error : {}", er);
},
match get_metrics_from_monitoring(0, request_delay as usize).await {
Ok(_) => info!("Grabing (eNODE.Monitoring) task de-initialized"),
Err(er) => error!("Grabing task returned an error : {}", er),
}
// JITTER NATIVE GRAB TASK
} else if std::env::var("VINTEO_API_KEY").is_ok() {
match init_grubbing_jitter().await {
Ok(_) => info!("Grabing (Vinteo - Jitter native) task de-initialized"),
Err(er) => error!("Jitter grabing mechanism crushed : {}", er),
}
// NATIVE GRAB TASK USING `config_api.json`
} else {
match init_api_grub_mechanism(config, &mut rx).await {
Ok(_) => {
info!("Grabing task deinitialized");
},
Err(er) => {
error!("Grabing task returned an error : {}", er);
},
Ok(_) => info!("Grabing task de-initialized"),
Err(er) => error!("Grabing task returned an error : {}", er),
}
}
});
@ -63,8 +67,7 @@ async fn main() -> Result<()>{
for event in events_handler {
let _ = event.await;
}
// let _ = tokio::join!(config_fut, grub_fut);
Ok(())
}

View File

@ -4,108 +4,360 @@ use serde_json::{Map, Value};
use reqwest::Client;
use tokio::sync::Semaphore;
use std::sync::Arc;
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size};
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, get_chunk_size};
use integr_structs::api::enode_monitoring::cmdb::Query;
use tokio::task::JoinHandle;
use std::pin::Pin;
use std::future::Future;
use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
use log::{error, info, warn};
use tracing::{error, info, warn};
use std::collections::HashMap;
trait AsDeviceRequest {
fn as_devices(self) -> Vec<String>;
}
trait IntoEnodeRequset {
fn into_enode_request(self) -> String;
}
impl AsDeviceRequest for Vec<String> {
fn as_devices(mut self) -> Vec<String> {
self.iter_mut()
.for_each(|dev| *dev = format!("/measures/device${}", dev));
self
}
}
#[derive(Debug)]
struct MetricInstance {
dola_id : String,
name : String,
desc : String,
device : String,
source : String,
}
impl IntoEnodeRequset for &[MetricInstance] {
fn into_enode_request(self) -> String {
let mut vec: Vec<String> = Vec::new();
vec.push("%5B".to_owned());
self.iter()
.enumerate()
.for_each(|(id, val)| {
if id > 0 {
vec.push(",".to_owned());
}
vec.push(format!("%22{}%22", val.dola_id));
});
vec.push("%5D".to_owned());
vec.concat()
}
}
#[derive(Debug)]
struct MetricMeta {
name : String,
desc : String,
device : String,
source : String,
}
impl Default for MetricMeta {
fn default() -> Self {
Self {
name : String::new(),
desc : String::new(),
device : String::new(),
source : String::new(),
}
}
}
#[allow(private_interfaces)]
pub trait LazyUnzipInstance {
fn lazy_unzip(&self) -> HashMap<String, MetricMeta>;
}
impl LazyUnzipInstance for &[MetricInstance] {
fn lazy_unzip(&self) -> HashMap<String, MetricMeta> {
self.iter().map(
|obj|
(
obj.dola_id.to_string(),
MetricMeta::new(
&obj.name,
&obj.desc,
&obj.device,
&obj.source
)
)
).collect()
}
}
impl MetricInstance {
fn new(id : &str, name : &str, desc : &str, device : &str, source : &str) -> Self {
Self {
dola_id : id.to_owned(),
name : name.to_owned(),
desc : desc.to_owned(),
device : device.to_owned(),
source : source.to_owned(),
}
}
}
impl MetricMeta {
fn new(name : &str, desc : &str, device : &str, source : &str) -> Self {
Self {
name : name.to_owned(),
desc : desc.to_owned(),
device : device.to_owned(),
source : source.to_owned(),
}
}
}
/// # Fn `get_metrics_from_monitoring`
///
/// A function to init pulling and exporting metrics mechanism
/// from CM to Exporter. It spawns async tasks to get measures
/// and their values and then extract needed info to export
///
/// ### Dev-Info :
///
/// *input* : duration and delay as `usize` (in secs)
///
/// *output* : `anyhow::Result<()>`
///
/// *initiator* : fn `main`
///
/// *managing* : runtime of N async tasks (N - count of chunks)
///
/// # Example
///
/// ```
/// use api-grub::monitoring::get_metrics_from_monitoring;
///
/// // exec func without time restriction but with delay in 5 secs
/// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(()));
/// ```
///
#[tracing::instrument(name = "cm_fn_initiator", skip_all)]
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
let timer = tokio::time::Instant::now();
let mut a = MonitoringImporter::new().await;
'outer: loop {
let mut a = MonitoringImporter::new().await;
// let mut a = MonitoringImporter::new().await;
a.start_session().await?;
tracing::debug!("CM creds struct - {:#?}", a);
let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
tracing::debug!("Measures Vec - {:#?}", vec);
'inner: loop {
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
break 'outer;
}
if let Err(_) = a.get_measure_info(vec.clone()).await {
if vec.len() == 0 || a.get_measure_info(vec.clone()).await.is_err() {
warn!("Session dropped, creating new ...");
break 'inner;
}
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
}
}
Ok(())
}
#[derive(Debug, Clone)]
/// An entity which handle CM creds
///
/// Used to capture measures and there values, to preprocess all measures to
/// relevant Exporter's structure
///
/// # Example:
///
/// ```
/// use api-grub::monitoring::MonitoringImporter;
///
/// let mut a = MonitoringImporter::new().await;
/// a.start_session().await?;
/// let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
///
/// assert_eq!(a.get_measure_info(vec.clone()).await, Ok(()));
/// ```
///
#[derive(Clone)]
pub struct MonitoringImporter {
ip : String,
login : String,
password : String,
access_token : String,
api_token : String,
ts : String,
timeout : usize,
}
impl MonitoringImporter {
/// The most simple constructor for `MonitoringImporter`
///
/// Returns `Self` object that is constructing according to
/// env vars:
/// - `ENODE_MONITORING_IP`
/// - `ENODE_MONITORING_LOGIN`
/// - `ENODE_MONITORING_PASSWORD`
///
/// If env vars will not be set, it returns `Self` with
/// empty fields
///
pub async fn new() -> Self {
MonitoringImporter {
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()),
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
access_token : String::new(),
api_token : env::var("ENODE_API_TOKEN").unwrap_or_else(|_| String::new()),
ts : String::new(),
timeout : std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()).parse().unwrap_or_else(|_| 10)
}
}
/// Function that checks is current `MonitoringImporter` valid
/// and can be used to pull and push info to and from CM
///
async fn is_valid(&self) -> bool {
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
!self.ip.is_empty() && ((!self.login.is_empty() && !self.password.is_empty() ) || !self.api_token.is_empty())
}
/// Function that checks is current `MonitoringImporter` valid
/// and can be used to pull and push info to and from CM
///
async fn is_minimal(&self) -> bool {
(self.login.is_empty() || self.password.is_empty()) && !self.api_token.is_empty()
}
/// A setter of `timestamp`
///
/// This function is needed to set a `timestamp` after
/// CM session creation.
///
/// This `timestamp` is a date of creation a session
/// on the CM Server
async fn set_ts(&mut self, ts: &str) {
self.ts = ts.to_owned();
}
/// A function for creation CM session
///
/// Returns OK(()) if session was created and there were
/// no errors (neither internal no external)
///
/// *Also* it saves ts and access-key in it's runtime environment,
/// there's no way to get access-key of session
#[tracing::instrument(name = "cm_fn_session_start", skip_all)]
pub async fn start_session(&mut self) -> anyhow::Result<()> {
if !self.is_valid().await {
if self.is_minimal().await {
return Err(Error::msg(format!("Given API-Token is no more actual now ({})", &self.access_token)));
}
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
}
let client = Client::new();
let url = format!("http://{}/e-data-front/auth/login", self.ip);
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
let client = client
.post(url)
.header("Content-Type", "application/json")
.json(&fortoken);
let resp = client.send().await?;
let auth = resp.json::<AuthResponse>().await?;
self.set_ts(&fortoken.ts).await;
self.access_token = auth.access_token.to_owned();
if !self.api_token.is_empty() {
std::mem::swap(&mut self.access_token, &mut self.api_token);
info!("API-Token that was in the ENODE configuration was set as access-token");
} else {
let client = Client::new();
let url = format!("http://{}/e-data-front/auth/login", self.ip);
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
let mut delay = 1;
loop {
let client = client
.post(&url)
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
.header("Content-Type", "application/json")
.json(&fortoken);
if let Ok(resp) = client.send().await {
match resp.json::<AuthResponse>().await {
Ok(auth) => {
self.set_ts(&fortoken.ts).await;
self.access_token = auth.access_token.to_owned();
tracing::trace!("Access key was changed");
break;
},
Err(er) => error!("Error with extracting access-key from CM response due to {}", er),
}
}
error!("Error while trying to create a new session, waiting {} secs and retrying ...", delay);
tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await;
delay = delay * 2;
}
info!("Started a new CM session");
}
Ok(())
}
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
/// A function for pulling measures list
///
/// Used with actual credentials for current CM session
/// and returning measures in format of `Ok(Vec<(String, String)>)`
/// , where `(String, String)` is a tuple of measure `id` and `description`
/// (`name`)
#[tracing::instrument(name = "CM get metrics list mechanism", skip_all)]
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<MetricInstance>> {
tracing::trace!("Trying ti get measures list from CM ...");
let client = Client::new();
let mut vec: Vec<(String, String)> = Vec::new();
let mut vec: Vec<MetricInstance> = Vec::new();
let url = format!("http://{}/e-cmdb/api/query", self.ip);
let id_list = {
match std::env::var("ENODE_TARGET_DEVICES") {
Err(_) => vec![String::from("18"), String::from("19")],
Ok(var) => var.split(',').into_iter().map(|st| st.trim().to_string()).collect::<Vec<String>>(),
}
};
let list_of_devices = id_list.clone().as_devices();
let client = client
.post(url)
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
.header("Content-Type", "application/json")
.header("access-token", &self.access_token)
.json(&Query::default());
.bearer_auth(&self.access_token)
.json(&Query::device_oriented(list_of_devices));
let resp = client.send().await?.text().await?;
let resp: Value = serde_json::from_str(&resp)?;
if let Some(arr) = resp.as_array() {
for measure in arr {
let id = measure.get("id");
let cls = measure.get("cls");
let name = measure.get("name");
if id.is_some() && cls.is_some() {
// todo: later wait for Vaitaliy call of classification
let id = id.unwrap().as_i64().unwrap_or_default();
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
if cls.is_empty() {
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
for device in arr {
let device_id = {
match device.get("name") {
Some(name) => {
match serde_json::to_string(name) {
Ok(name) => {
name.split('$').last().unwrap_or_else(|| "undefined-device").to_owned()
},
Err(_) => "undefined-device".to_string(),
}
},
None => "undefined-device".to_string(),
}
};
let device_id = device_id.trim_end_matches('"');
if let Some(links) = device.get("links") {
if let Some(measures) = links.as_array() {
for measure in measures.iter() {
let dola_id = measure.get("id");
let id = measure.get("measure_id");
let source = measure.get("source_id");
let desc = measure.get("name");
if id.is_some() && source.is_some() && dola_id.is_some() {
let dola_id = format!("measure${}", dola_id.unwrap().as_i64().unwrap_or_else(|| 0));
let id = id.unwrap().as_str().unwrap_or_else(|| "no-name");
let source = source.unwrap().as_str().unwrap_or_else(|| "no-source");
let desc = desc.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "no description");
if source.is_empty() {
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`measure_id` or `source_id`)"));
}
vec.push(MetricInstance::new(&dola_id, id, desc, device_id.as_ref(), source));
}
}
}
vec.push((format!("{}${}", cls, id), name.to_string()));
}
}
} else {
@ -114,7 +366,25 @@ impl MonitoringImporter {
info!("List of measures was pulled, total - {}", &vec.len());
Ok(vec)
}
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
/// A function to get realtime data
///
/// It pulles info about 1 measure or a slice of measures and
/// exports all data to Prometehus exporter
///
/// # How it works
/// 1) creates a restriction for max count of async
/// tasks (`tokio::sync::Semaphore`)
///
/// 2) divides vec of measures in case of creating chunks with
/// the most optimal sizes to optimize self and server load
///
/// 3) spawns async tasks-grabbers to get measures info which
/// exprots all data by itselfs
///
#[tracing::instrument(name = "CM get measures info mechanism", skip_all)]
pub async fn get_measure_info(&self, measures: Arc<Vec<MetricInstance>>) -> anyhow::Result<()> {
tracing::trace!("Trying to get info about each measure ...");
let mut sys = sysinfo::System::new();
sys.refresh_cpu_all();
// adaptive permition on task spawm to prevent system overload
@ -131,11 +401,16 @@ impl MonitoringImporter {
let arc = arc.clone();
let client = client.clone();
let hm = measure.lazy_unzip();
let measure = Arc::new(measure.display());
let measure = Arc::new(measure.into_enode_request());
let _permit = permit.acquire().await.unwrap();
let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
Self::process_endpoint(measure.clone(), client.clone(), arc.clone(), &hm).await
Self::process_endpoint(
measure.clone(),
client.clone(),
arc.clone(),
&hm,
).await
});
jh_vec.push(jh);
@ -144,23 +419,42 @@ impl MonitoringImporter {
for event in jh_vec {
match event.await {
Ok(val) => {
if let Ok(val) = val {
match crate::export::Exporter::export_extended_metrics(val).await {
Ok(bytes) => {info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes)},
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
}
match crate::export::Exporter::export_extended_metrics(val?).await {
Ok(bytes) => {info!("Successfully transmitted {} bytes", bytes)},
Err(er) => error!("Cannot export data due to : `{}`", er),
}
},
Err(er) => println!("Fatal error on async task: {}", er),
Err(er) => {
println!("Fatal error on async task: {}", er);
return Err(anyhow::Error::msg(format!("Fatal error on async task: {}", er)))
},
}
}
Ok(())
}
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
/// An async task-grabber
///
/// Used to create request to the CM server and
/// get all measure(s) data
///
/// # Also
/// An argument `measure: Arc<String>` can be a single measure like `measure$1` or
/// a slice of measures in special format `%5B%22measure$1%22,%20%22measure$2%22%5D`.
/// This is a neccesary measure to handle two types of requests and URL restrictions
///
async fn process_endpoint(
measure: Arc<String>,
client: Arc<Client>,
arc: Arc<Self>,
hm: &HashMap<String, MetricMeta>,
) -> anyhow::Result<PrometheusMetricsExtended> {
tracing::trace!("Processing CM endpoint with one or more measure names");
let resp = client
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
.timeout(tokio::time::Duration::from_secs(arc.timeout as u64))
.header("Content-Type", "application/json")
.header("access-token", &arc.access_token)
.bearer_auth(&arc.access_token)
.send().await?
.text().await?;
tokio::task::yield_now().await;
@ -170,7 +464,22 @@ impl MonitoringImporter {
PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await
)
}
fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
/// An recursive extractor of data
///
/// Uses target-json CM-Server response as `Value` and HashMap of
/// measures' `id`s and their appropriate `description`s
///
/// # How it works
/// 1) if `Value` is an `Object` -> executes `Self::process_value` on it and
/// returns result of the function as `Vec`
///
/// 2) if `Value` is an `Array` -> self-executes for each pat of the array
/// and aggregates all data in the `Vec` by using `.append(&mut Vec<...>)`
///
/// 3) if `Value` is `_` -> returns error **Invalid JSON format**
///
fn extract_metric_data(json: Value, hm: &HashMap<String, MetricMeta>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
Box::pin(async move {
return match json {
Value::Object(obj) => {
@ -190,50 +499,64 @@ impl MonitoringImporter {
}
})
}
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
/// A function-extractor for single measure object
///
/// Searches for certain fields and aggregates it in the `MetricOutputExtended`
/// object
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, MetricMeta>) -> anyhow::Result<MetricOutputExtended> {
tracing::trace!("Processing atomic Object value in CM JSON-response");
let id = obj.get("$id");
let val = obj.get("value");
let description = {
let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null);
let zero = String::new();
if dola_ip.is_null() {
zero
} else {
hm.get(
dola_ip.as_str().unwrap_or_else(|| "")
)
.unwrap_or_else(|| &zero)
.to_owned()
}
};
if id.is_none() || val.is_none() {
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
}
let id = id.unwrap().as_str().unwrap_or_else(|| "").replace("$", "_");
let id = id.unwrap().as_str().unwrap_or_else(|| "");
let default_meta = MetricMeta::default();
let meta = hm.get(id).unwrap_or_else(|| &default_meta);
let id = id.replace("$", "_");
let val = val.unwrap();
let device = meta.device.parse::<usize>().unwrap_or_else(|_| 0);
if id.is_empty() {
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
}
Ok(MetricOutputExtended {
id : id.to_owned(),
json_type : match val {
Ok(MetricOutputExtended::new_with_slices(
id.as_ref(),
&meta.name,
{
match val {
Value::Number(val) => {
if val.is_i64() {
"i64".to_owned()
"i64"
} else if val.is_u64() {
"u64".to_owned()
"u64"
} else {
"f64".to_owned()
"f64"
}
},
_ => "unknown".to_owned(),
_ => "unknown",
}
},
addr : "enode.monitoring.api".to_owned(),
desc : description,
value : val.clone()
})
"enode.monitoring.api",
&meta.desc,
Some(device),
Some(meta.source.clone()),
val.clone(),
))
}
}
impl std::fmt::Debug for MonitoringImporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MonitoringImporter")
.field("ip", &self.ip)
.field("login", &self.login)
.field("password", &"****")
.field("access_key", &"HIDDEN")
.field("ts", &self.ts)
.finish()
}
}

View File

@ -1,7 +1,7 @@
// module to handle unix-socket connection + pulling info from api
use anyhow::Result;
use log::{error, info};
use tracing::{error, info};
use rand::random;
use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, Duration};
@ -97,9 +97,8 @@ impl<'a> ApiPoll<'a> {
error!("Bad JSON in response. Error: {}", er);
},
Ok(_) => {
let endpoint_name = &metrics.name;
let preproc = JsonParser::parse(&metrics.measure, &response);
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
let preproc = PrometheusMetrics::new(&service_id, preproc);
match Exporter::export_metrics(preproc).await {
Ok(bytes) => {
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);

View File

@ -2,9 +2,16 @@
name = "integr-structs"
version = "0.1.0"
edition = "2021"
description = "Structs for API poller in ZVKS project"
homepage = "http://git.enode/deployer3000/integration-module/src/branch/master/crates/integr-structs"
repository = "http://git.enode/deployer3000/integration-module/src/branch/master/crates/integr-structs"
license = "MIT OR Apache-2.0"
keywords = ["api", "grub", "zvks", "structs", "contracts"]
publish = ["kellnr"]
[dependencies]
anyhow = "1.0.95"
chrono = "0.4.40"
dotenv = "0.15.0"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.135"

View File

@ -160,6 +160,16 @@ pub mod v3 {
pub json_type : String,
pub addr : String,
}
impl Metric {
pub fn template() -> Self {
Self {
id : "room_tempo".to_string(),
json_type : "String".to_string(),
addr : "flat1.room1.rt_tempo".to_string(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Metrics {
pub name : String,
@ -255,21 +265,29 @@ pub mod v3 {
#[derive(Serialize, Deserialize, Debug)]
pub struct MetricOutputExtended {
pub id : String,
pub name : String,
#[serde(rename = "type")]
pub json_type : String,
pub addr : String,
pub value : Value,
#[serde(rename = "description")]
pub desc : String,
pub status: usize,
pub device: Option<usize>,
pub source: Option<String>,
}
impl MetricOutputExtended {
pub fn new_with_slices(id : &str, json_type : &str, addr: &str, desc : &str, value : Value) -> Self {
pub fn new_with_slices(id : &str, name: &str, json_type : &str, addr: &str, desc : &str, device: Option<usize>, source: Option<String>, value : Value) -> Self {
MetricOutputExtended {
id : id.to_string(),
name : name.to_string(),
json_type : json_type.to_string(),
addr : addr.to_string(),
value : value,
desc : desc.to_string(),
device,
source,
status: 0,
}
}
}
@ -277,21 +295,21 @@ pub mod v3 {
#[derive(Serialize, Deserialize, Debug)]
pub struct PrometheusMetrics {
pub service_name: String,
pub endpoint_name: String,
// pub endpoint_name: String,
pub metrics: Vec<MetricOutput>,
}
impl PrometheusMetrics {
pub fn new(service: &str, endpoint: &str, metrics: Vec<MetricOutput>) -> Self {
pub fn new(service: &str, metrics: Vec<MetricOutput>) -> Self {
Self {
service_name: service.to_string(),
endpoint_name: endpoint.to_string(),
// endpoint_name: endpoint.to_string(),
metrics: metrics
}
}
pub async fn new_zvks(metrics: Vec<MetricOutput>) -> Self {
Self {
service_name : "zvks".to_owned(),
endpoint_name : "apiforsnmp".to_owned(),
// endpoint_name : "apiforsnmp".to_owned(),
metrics : metrics,
}
}
@ -305,14 +323,21 @@ pub mod v3 {
#[derive(Serialize, Deserialize, Debug)]
pub struct PrometheusMetricsExtended {
pub service_name: String,
pub endpoint_name: String,
pub metrics: Vec<MetricOutputExtended>,
}
impl PrometheusMetricsExtended {
pub fn new_empty_jitter() -> Self {
Self {
service_name : "zvks".to_owned(),
metrics : Vec::new(),
}
}
pub fn add(&mut self, metric: MetricOutputExtended) {
self.metrics.push(metric);
}
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
Self {
service_name : "zvks".to_owned(),
endpoint_name : "apiforsnmp".to_owned(),
metrics : metrics,
}
}
@ -355,18 +380,18 @@ pub mod enode_monitoring {
pub struct Query {
id : Vec<String>,
data : Data,
#[serde(rename = "postQuery")]
post_query : String,
// #[serde(rename = "postQuery")]
// post_query : String,
#[serde(rename = "enableActions")]
enable_actions : bool,
ts : usize
}
impl Default for Query {
fn default() -> Self {
impl Query {
pub fn device_oriented(devices: Vec<String>) -> Self {
Self {
id : vec!["/measures/device$18".to_owned()],
id : devices,
data : Data::default(),
post_query : "links".to_owned(),
// post_query : "links".to_owned(),
enable_actions : false,
ts : 1740060679399
}
@ -394,7 +419,7 @@ pub mod enode_monitoring {
fn default() -> Self {
Self {
flatten : true,
filter : Filter::default()
filter : Filter::default(),
}
}
}
@ -411,7 +436,6 @@ pub mod enode_monitoring {
}
}
// "{\"access_token\":\"5BNQsmiGFQRNA651HeQxZekYgYUAWZ4e\",\"role\":\"administrator\",\"startRT\":\"2025-02-25T09:03:27.581Z\",\"login\":\"admin\",\"push_active\":null,\"$id\":\"systemuser$1\"}",
#[derive(Debug, Deserialize)]
pub struct AuthResponse {
pub access_token : String,
@ -465,14 +489,9 @@ pub mod enode_monitoring {
pub fn get_chunk_size(total_measures: usize) -> usize {
match total_measures {
0..=144 => total_measures,
145..=288 => total_measures / 4,
289..=432 => total_measures / 5,
433..=576 => total_measures / 6,
577..=720 => total_measures / 7,
721..=864 => total_measures / 8,
865..=1008 => total_measures / 9,
_ => total_measures / 10,
0..=432 => total_measures,
433..=1008 => total_measures / 2,
_ => total_measures / 4,
}
}