Merge pull request 'rc' (#6) from rc into master
commit
c6e24cae42
3
.env
3
.env
|
|
@ -1,3 +0,0 @@
|
||||||
CONFIG_SERVER_CREDS = "ws://127.0.0.1:8080"
|
|
||||||
API_GRUBBER_SOCKET = "api-grub.sock"
|
|
||||||
PREPROC_SOCKET = "preproc.sock"
|
|
||||||
|
|
@ -0,0 +1,15 @@
|
||||||
|
# Template .env for API grabber
|
||||||
|
|
||||||
|
# PostgreSQL connection [DEPRECATED]
|
||||||
|
DB_HOST = "ip.addr.postgresql.server"
|
||||||
|
DB_USER = "db_user"
|
||||||
|
DB_PASSWORD = "db_user_password"
|
||||||
|
DB_DBNAME = "db_name"1
|
||||||
|
|
||||||
|
# Prometheus-Exporter info
|
||||||
|
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
|
||||||
|
|
||||||
|
# eNODE.Monitoring configuration
|
||||||
|
ENODE_MONITORING_IP = "ip.ip.ip.ip"
|
||||||
|
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
|
||||||
|
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
/target
|
/target
|
||||||
Cargo.lock
|
Cargo.lock
|
||||||
*.sock
|
*.sock
|
||||||
|
.env
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
members = [
|
members = [
|
||||||
"crates/api-grub", "crates/config-delivery", "crates/integr-structs", "crates/preproc",
|
"crates/api-grub", "crates/integr-structs", "crates/preproc",
|
||||||
]
|
]
|
||||||
|
|
||||||
[profile.dev]
|
[profile.dev]
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,18 @@
|
||||||
|
FROM rust:1.84 AS builder
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
RUN apt update && apt install -y musl-tools
|
||||||
|
RUN rustup target add x86_64-unknown-linux-musl
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
RUN cargo test
|
||||||
|
RUN cargo build --release --target=x86_64-unknown-linux-musl
|
||||||
|
|
||||||
|
FROM alpine:latest
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
COPY --from=builder /app/target/x86_64-unknown-linux-musl/release/api-grub /app/api-grub
|
||||||
|
RUN apk add --no-cache ca-certificates
|
||||||
|
|
||||||
|
|
||||||
|
ENTRYPOINT ["/app/api-grub"]
|
||||||
|
|
@ -0,0 +1,80 @@
|
||||||
|
pipeline {
|
||||||
|
agent any
|
||||||
|
environment {
|
||||||
|
REGISTRY_NAME = 'registry.entcor/trust-module'
|
||||||
|
IMAGE_NAME = "integration-module"
|
||||||
|
GITEA_REPOSITORY_URL = "http://git.entcor/api/v1/repos/"
|
||||||
|
}
|
||||||
|
|
||||||
|
stages {
|
||||||
|
stage('Init variables') {
|
||||||
|
when {
|
||||||
|
expression { env.CHANGE_BRANCH?.startsWith('rc') }
|
||||||
|
}
|
||||||
|
steps {
|
||||||
|
script {
|
||||||
|
env.IMAGE_TAG = sh(script: "git describe --tags --abbrev=0", returnStdout: true).trim()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stage('Build tagged image and run tests') {
|
||||||
|
when {
|
||||||
|
expression { env.CHANGE_BRANCH?.startsWith('rc') }
|
||||||
|
}
|
||||||
|
steps {
|
||||||
|
script {
|
||||||
|
try {
|
||||||
|
def image = docker.build("${env.IMAGE_NAME}:${env.IMAGE_TAG}")
|
||||||
|
sh "docker tag ${env.IMAGE_NAME}:${env.IMAGE_TAG} ${env.REGISTRY_NAME}/${env.IMAGE_NAME}:${env.IMAGE_TAG}"
|
||||||
|
} catch (Exception e) {
|
||||||
|
error("Tests failed: ${e.message}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stage ('Push docker image to registry') {
|
||||||
|
when {
|
||||||
|
expression { env.CHANGE_BRANCH?.startsWith('rc') }
|
||||||
|
}
|
||||||
|
steps {
|
||||||
|
script {
|
||||||
|
docker.withRegistry('https://registry.entcor/harbor/', 'harbor-credentials-id') {
|
||||||
|
docker.image("${env.REGISTRY_NAME}/${env.IMAGE_NAME}:${env.IMAGE_TAG}").push()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
post {
|
||||||
|
always {
|
||||||
|
script {
|
||||||
|
echo "Cleaning up workspace..."
|
||||||
|
sh "rm -rf ${env.WORKSPACE}/rc/ || true"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
success {
|
||||||
|
script {
|
||||||
|
if (env.CHANGE_BRANCH?.startsWith('rc')) {
|
||||||
|
echo "Attempting to merge PR ${env.CHANGE_ID} into master..."
|
||||||
|
withCredentials([usernamePassword(credentialsId: 'gitea_creds', usernameVariable: 'GITEA_USER', passwordVariable: 'GITEA_PASS')]) {
|
||||||
|
def prId = env.CHANGE_ID
|
||||||
|
sh """
|
||||||
|
curl -X POST \
|
||||||
|
-u "${GITEA_USER}:${GITEA_PASS}" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{"do":"merge"}' \
|
||||||
|
http://git.entcor/api/v1/repos/deployer3000/integration-module/pulls/${prId}/merge
|
||||||
|
"""
|
||||||
|
echo "PR ${prId} merged successfully into master!"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
failure {
|
||||||
|
echo "Pipeline failed. Check the logs for details."
|
||||||
|
}
|
||||||
|
aborted {
|
||||||
|
echo "Pipeline was aborted."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -9,7 +9,7 @@
|
||||||
|
|
||||||
| Crate (submodule) | Progress |
|
| Crate (submodule) | Progress |
|
||||||
|---|---|
|
|---|---|
|
||||||
|`api-grub` | ✅✅✅✅✅✅✅🔲🔲🔲 |
|
|`api-grub` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ |
|
||||||
|`config-delivery` | ✅✅✅✅🔲🔲🔲🔲🔲🔲 |
|
|`config-delivery` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ |
|
||||||
|`integrs-structs` | ✅✅✅✅✅✅🔲🔲🔲🔲 |
|
|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ |
|
||||||
|`preproc` | 🔲🔲🔲🔲🔲🔲🔲🔲🔲🔲 |
|
|`preproc` [temp-deprecated] | ✅✅✅❌❌❌❌❌❌❌ | (разработка временно остановлена)
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,32 @@
|
||||||
{
|
{
|
||||||
"endpoints" : [
|
"config": [
|
||||||
{
|
{
|
||||||
"url" : "http://127.0.0.1:8081/ping",
|
"id":"zvks",
|
||||||
"method" : "GET"
|
"login" : "",
|
||||||
},
|
"pass" : "",
|
||||||
{
|
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
||||||
"url" : "http://127.0.0.1:8081/",
|
"period" : "",
|
||||||
"method" : "GET"
|
"timeout" : "5",
|
||||||
|
"metrics" : [
|
||||||
|
{
|
||||||
|
"name": "conferences",
|
||||||
|
"url": "https://demo.vcs.vinteo.dev/api/v1/conferences",
|
||||||
|
"measure": [
|
||||||
|
{ "id":"number", "type": "text", "addr": "data.conferences[].number" },
|
||||||
|
{ "id":"total", "type": "integer", "addr": "data.total" },
|
||||||
|
{ "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" },
|
||||||
|
{ "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" },
|
||||||
|
{ "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" }
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "abonents",
|
||||||
|
"url": "https://demo.vcs.vinteo.dev/api/v1/accounts",
|
||||||
|
"measure": [
|
||||||
|
{ "id":"total", "type": "integer", "addr": "data.total" }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
}
|
}
|
||||||
],
|
]
|
||||||
"delay" : 5
|
|
||||||
}
|
}
|
||||||
|
|
@ -13,3 +13,10 @@ log = "0.4.25"
|
||||||
anyhow = "1.0.95"
|
anyhow = "1.0.95"
|
||||||
chrono = "0.4.39"
|
chrono = "0.4.39"
|
||||||
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
|
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
|
||||||
|
deadpool-postgres = { version = "0.14.1", features = ["serde"] }
|
||||||
|
tokio-postgres = "0.7.12"
|
||||||
|
dotenv = "0.15.0"
|
||||||
|
md5 = "0.7.0"
|
||||||
|
rand = "0.9.0"
|
||||||
|
sysinfo = "0.33.1"
|
||||||
|
openssl = { version = "0.10", features = ["vendored"] }
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
// mod to communicate with api-grub config file
|
// mod to communicate with api-grub config file
|
||||||
// 1) check changes in unix-socket
|
// 1) check changes in unix-socket
|
||||||
// 2) save changes in local config file
|
// 2) save changes in local config file
|
||||||
use integr_structs::api::ApiConfig;
|
use integr_structs::api::ApiConfigV2;
|
||||||
use anyhow::{Error, Ok, Result};
|
use anyhow::{Error, Ok, Result};
|
||||||
use log::{info, warn, error};
|
use log::{info, warn, error};
|
||||||
use std::{fs, path::Path};
|
use std::{fs, path::Path};
|
||||||
|
|
@ -10,12 +10,13 @@ use tokio::{io::AsyncReadExt, net::UnixListener};
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use std::result::Result::Ok as stdOk;
|
use std::result::Result::Ok as stdOk;
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
|
use integr_structs::api::v3::Config;
|
||||||
|
|
||||||
const CONFIG_PATH: &str = "config_api.json";
|
const CONFIG_PATH: &str = "config_api.json";
|
||||||
const SOCKET_PATH: &str = "api-grub.sock";
|
const SOCKET_PATH: &str = "api-grub.sock";
|
||||||
|
|
||||||
// todo! rewrite to use current_exe
|
// todo! rewrite to use current_exe
|
||||||
pub async fn pull_local_config() -> Result<ApiConfig> {
|
pub async fn pull_local_config() -> Result<Config> {
|
||||||
// let conf_path = std::env::current_exe()?;
|
// let conf_path = std::env::current_exe()?;
|
||||||
let path = Path::new(CONFIG_PATH);
|
let path = Path::new(CONFIG_PATH);
|
||||||
// return match conf_path.parent() {
|
// return match conf_path.parent() {
|
||||||
|
|
@ -28,7 +29,7 @@ pub async fn pull_local_config() -> Result<ApiConfig> {
|
||||||
// None => Err(Error::msg("No local conf was found"))
|
// None => Err(Error::msg("No local conf was found"))
|
||||||
// }
|
// }
|
||||||
if path.exists() && path.is_file() {
|
if path.exists() && path.is_file() {
|
||||||
let config: ApiConfig = from_str(
|
let config: Config = from_str(
|
||||||
&fs::read_to_string(CONFIG_PATH)?
|
&fs::read_to_string(CONFIG_PATH)?
|
||||||
)?;
|
)?;
|
||||||
Ok(config)
|
Ok(config)
|
||||||
|
|
@ -39,7 +40,7 @@ pub async fn pull_local_config() -> Result<ApiConfig> {
|
||||||
|
|
||||||
// for config pulling
|
// for config pulling
|
||||||
// ++++ reader to channel
|
// ++++ reader to channel
|
||||||
pub async fn init_config_grub_mechanism(tx: &Sender<ApiConfig>) -> Result<()> {
|
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
|
||||||
info!("Initializing Unix-Socket listening for pulling new configs...");
|
info!("Initializing Unix-Socket listening for pulling new configs...");
|
||||||
let server = init_unix_listener().await?;
|
let server = init_unix_listener().await?;
|
||||||
//
|
//
|
||||||
|
|
@ -51,7 +52,7 @@ pub async fn init_config_grub_mechanism(tx: &Sender<ApiConfig>) -> Result<()> {
|
||||||
if let Err(er) = stream.read_to_string(&mut buffer).await {
|
if let Err(er) = stream.read_to_string(&mut buffer).await {
|
||||||
warn!("Cannot read config from stream due to {}", er);
|
warn!("Cannot read config from stream due to {}", er);
|
||||||
} else {
|
} else {
|
||||||
let config: Result<ApiConfig, serde_json::Error> = from_str(&buffer);
|
let config: Result<Config, serde_json::Error> = from_str(&buffer);
|
||||||
if let stdOk(conf) = config {
|
if let stdOk(conf) = config {
|
||||||
info!("New config was pulled from Unix-Stream. Saving it locally and sharing with API-grub module...");
|
info!("New config was pulled from Unix-Stream. Saving it locally and sharing with API-grub module...");
|
||||||
if let Err(er) = save_new_config(&buffer).await {
|
if let Err(er) = save_new_config(&buffer).await {
|
||||||
|
|
@ -97,13 +98,13 @@ mod config_unittests {
|
||||||
#[test]
|
#[test]
|
||||||
async fn check_save_new_config() {
|
async fn check_save_new_config() {
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use integr_structs::api::ApiConfig;
|
use integr_structs::api::v3::Config;
|
||||||
use serde_json::to_string;
|
use serde_json::to_string;
|
||||||
|
|
||||||
let test_config_path = "test_config_api.json";
|
let test_config_path = "test_config_api.json";
|
||||||
|
|
||||||
// config gen
|
// config gen
|
||||||
let config = to_string::<ApiConfig>(&ApiConfig::default());
|
let config = to_string::<Config>(&Config::default());
|
||||||
assert!(config.is_ok());
|
assert!(config.is_ok());
|
||||||
let config = config.unwrap();
|
let config = config.unwrap();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,71 @@
|
||||||
|
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
|
||||||
|
use integr_structs::api::v3::PrometheusMetrics;
|
||||||
|
use reqwest::Client;
|
||||||
|
use tokio_postgres::NoTls;
|
||||||
|
use std::env;
|
||||||
|
use anyhow::{Result};
|
||||||
|
use log::{info, error};
|
||||||
|
|
||||||
|
pub struct Exporter {
|
||||||
|
pool : Option<Pool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Exporter {
|
||||||
|
fn config_construct() -> Result<Config> {
|
||||||
|
let mut cfg = Config::new();
|
||||||
|
cfg.host = Some(env::var("DB_HOST")?);
|
||||||
|
cfg.dbname = Some(env::var("DB_DBNAME")?);
|
||||||
|
cfg.user = Some(env::var("DB_USER")?);
|
||||||
|
cfg.password = Some(env::var("DB_PASSWORD")?);
|
||||||
|
Ok(cfg)
|
||||||
|
}
|
||||||
|
fn pool_construct() -> Option<Pool> {
|
||||||
|
return match Self::config_construct() {
|
||||||
|
Ok(config) => {
|
||||||
|
if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) {
|
||||||
|
info!("Connected to PostgreSQL");
|
||||||
|
return Some(pool);
|
||||||
|
}
|
||||||
|
None
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Bad DB credentials or it's unreachable");
|
||||||
|
None
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
||||||
|
pub fn init() -> Self {
|
||||||
|
Self {
|
||||||
|
pool : Self::pool_construct(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
||||||
|
if let Some(pool) = &self.pool {
|
||||||
|
return Some(pool.get().await.ok()?);
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||||
|
// client.
|
||||||
|
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
||||||
|
let _ = client.query(&query, &[&metrics]).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
||||||
|
let url = env::var("EXPORTER_URL")?;
|
||||||
|
// let req = Request::new(Method::PUT,
|
||||||
|
// Url::parse(metrics)?);
|
||||||
|
// dbg!(&metrics);
|
||||||
|
let req = Client::new()
|
||||||
|
.post(url)
|
||||||
|
.json(&metrics)
|
||||||
|
.send().await;
|
||||||
|
// dbg!(&req);
|
||||||
|
// dbg!(&req.unwrap().text().await);
|
||||||
|
// todo : rewrite with status code wrapping
|
||||||
|
req?;
|
||||||
|
Ok(metrics.get_bytes_len())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,155 @@
|
||||||
|
// use serde::{de::value, Serialize};
|
||||||
|
use serde_json::{json, Value};
|
||||||
|
use integr_structs::api::v3::{Metric, MetricOutput};
|
||||||
|
|
||||||
|
pub struct JsonParser;
|
||||||
|
|
||||||
|
impl JsonParser {
|
||||||
|
pub fn parse(targets: &Vec<Metric>, json: &str) -> Vec<MetricOutput> {
|
||||||
|
let mut res_vec: Vec<MetricOutput> = Vec::new();
|
||||||
|
for target in targets {
|
||||||
|
let metric = match target.addr.contains("[") {
|
||||||
|
true => JsonParser::get_sum_of_metrics_in_array(target, json),
|
||||||
|
false => JsonParser::get_metric(target, json),
|
||||||
|
};
|
||||||
|
res_vec.push(MetricOutput::new_with_slices(&target.id, &target.json_type, &target.addr, metric));
|
||||||
|
}
|
||||||
|
res_vec
|
||||||
|
}
|
||||||
|
fn get_sum_of_metrics_in_array(target: &Metric, json: &str) -> Value {
|
||||||
|
if target.addr.is_empty() {
|
||||||
|
return Value::Null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// let mut vec_value: Vec<Value> = Vec::new();
|
||||||
|
// let mut array_key = String::new();
|
||||||
|
let mut value_json: Value = serde_json::from_str(json).unwrap_or(Value::Null);
|
||||||
|
|
||||||
|
let target_attr_vec = target.addr
|
||||||
|
.split_terminator('.')
|
||||||
|
.collect::<Vec<&str>>();
|
||||||
|
// for keys in [] brackets
|
||||||
|
let mut key_tag = String::new();
|
||||||
|
|
||||||
|
for (global_idx, &key) in target_attr_vec.iter().enumerate() {
|
||||||
|
// if array
|
||||||
|
let key_checked = if key.contains('[') {
|
||||||
|
let key_idx = key.find("[").unwrap();
|
||||||
|
key_tag = key.chars()
|
||||||
|
.enumerate()
|
||||||
|
.filter(|(idx, chr)| *idx > key_idx && *chr != ']')
|
||||||
|
.map(|(_, chr)| chr)
|
||||||
|
.collect::<String>();
|
||||||
|
// dbg!(&key_tag);
|
||||||
|
key.chars()
|
||||||
|
.enumerate()
|
||||||
|
.filter(|(idx, _)| *idx < key_idx)
|
||||||
|
.map(|(_, chr)| chr)
|
||||||
|
.collect::<String>()
|
||||||
|
// dbg!(value_json.get(&array_key).unwrap_or(&Value::Null));
|
||||||
|
// value_json = value_json.get(array_key).unwrap_or(&Value::Null).clone();
|
||||||
|
// continue;
|
||||||
|
// new_key.as_str()
|
||||||
|
// dbg!(key); APPROVED
|
||||||
|
// TODO: need to check key in [] type of [KEY]
|
||||||
|
} else {key.to_owned()};
|
||||||
|
|
||||||
|
// if already array
|
||||||
|
match value_json.get(key_checked) {
|
||||||
|
Some(val) => {
|
||||||
|
match val {
|
||||||
|
Value::Array(array) => {
|
||||||
|
// form new target array
|
||||||
|
let new_array_target = target_attr_vec
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.filter(|(idx, _)| *idx > global_idx)
|
||||||
|
.map(|(_, &chr)| chr.to_owned())
|
||||||
|
.collect::<Vec<String>>();
|
||||||
|
// get_values_in_array
|
||||||
|
// get_tags_in_array
|
||||||
|
// // slice_tags_with_values
|
||||||
|
// dbg!(&array);
|
||||||
|
// dbg!(&new_array_target);
|
||||||
|
let res_arr = Self::get_values_in_array(array, &new_array_target);
|
||||||
|
if &key_tag == "" {
|
||||||
|
return res_arr.into();
|
||||||
|
}
|
||||||
|
return Self::slice_with_tags_in_array(array, &res_arr, &key_tag).into()
|
||||||
|
|
||||||
|
},
|
||||||
|
_ => value_json = val.clone(),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => return Value::Null,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
value_json
|
||||||
|
}
|
||||||
|
|
||||||
|
fn slice_with_tags_in_array(array: &Vec<Value>, metrics: &Vec<Value>, tag_name: &str) -> Vec<Value> {
|
||||||
|
if tag_name.is_empty() {
|
||||||
|
return metrics.clone();
|
||||||
|
}
|
||||||
|
// array[0].as_object().unwrap_or(json!(Value::Null))
|
||||||
|
let mut values: Vec<Value> = Vec::new();
|
||||||
|
array.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(idx, val)| {
|
||||||
|
let val = val.get(tag_name).unwrap_or(&Value::Null).clone();
|
||||||
|
(serde_json::from_value::<serde_json::Map<String, Value>>(
|
||||||
|
json!({"tag_name": tag_name, "tag_value": val})
|
||||||
|
),
|
||||||
|
serde_json::from_value::<serde_json::Map<String, Value>>(metrics[idx].clone()))
|
||||||
|
})
|
||||||
|
.for_each(|(tags, val)| {
|
||||||
|
if val.is_ok() && tags.is_ok() {
|
||||||
|
let mut tags = tags.unwrap();
|
||||||
|
let mut val = val.unwrap();
|
||||||
|
tags.append(&mut val);
|
||||||
|
dbg!(&tags);
|
||||||
|
values.push(json!(tags));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if values.len() == 0 {
|
||||||
|
return metrics.clone();
|
||||||
|
}
|
||||||
|
values
|
||||||
|
}
|
||||||
|
fn get_values_in_array(array: &Vec<Value>, fields: &Vec<String>) -> Vec<Value> {
|
||||||
|
let mut values: Vec<Value> = Vec::new();
|
||||||
|
for obj in array {
|
||||||
|
// dbg!(obj);
|
||||||
|
let mut obrez = obj.clone();
|
||||||
|
for field in fields {
|
||||||
|
obrez = obrez.get(field).unwrap_or(&Value::Null).clone();
|
||||||
|
match obrez {
|
||||||
|
Value::Object(_) => {continue},
|
||||||
|
_ => {
|
||||||
|
values.push(json!({field: obrez.clone()}));
|
||||||
|
},
|
||||||
|
// None => {values.push(Value::Null)},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
values
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_metric(target: &Metric, json: &str) -> Value {
|
||||||
|
if target.addr.is_empty() {
|
||||||
|
return Value::Null;
|
||||||
|
}
|
||||||
|
let mut value_json: Value = serde_json::from_str(json).unwrap_or(Value::Null);
|
||||||
|
|
||||||
|
let target_attr_vec = target.addr
|
||||||
|
.split_terminator('.')
|
||||||
|
.collect::<Vec<&str>>();
|
||||||
|
for key in target_attr_vec {
|
||||||
|
match value_json.get(key) {
|
||||||
|
Some(val) => value_json = val.clone(),
|
||||||
|
None => return Value::Null,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
value_json
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,35 +1,74 @@
|
||||||
mod config;
|
mod config;
|
||||||
mod net;
|
mod net;
|
||||||
mod logger;
|
mod logger;
|
||||||
|
mod json;
|
||||||
|
mod export;
|
||||||
|
mod monitoring;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use integr_structs::api::ApiConfig;
|
// use integr_structs::api::ApiConfigV2;
|
||||||
|
use integr_structs::api::v3::Config;
|
||||||
use logger::setup_logger;
|
use logger::setup_logger;
|
||||||
use log::{info, warn};
|
// use log::{info, warn};
|
||||||
use config::{pull_local_config, init_config_grub_mechanism};
|
use config::{pull_local_config, init_config_grub_mechanism};
|
||||||
use net::init_api_grub_mechanism;
|
use net::init_api_grub_mechanism;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
use log::{error, info, warn};
|
||||||
|
use monitoring::get_metrics_from_monitoring;
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
async fn main() -> Result<()>{
|
async fn main() -> Result<()>{
|
||||||
// 3 coroutines
|
dotenv::dotenv().ok();
|
||||||
// 1) unix-socket coroutine (for config updating)
|
|
||||||
// 2) api coroutine
|
|
||||||
// 3) ?
|
|
||||||
setup_logger().await?;
|
setup_logger().await?;
|
||||||
let config = get_config().await;
|
let config = get_config().await;
|
||||||
// config update channel
|
// config update channel
|
||||||
let (tx, mut rx) = mpsc::channel::<ApiConfig>(1);
|
let (tx, mut rx) = mpsc::channel::<Config>(1);
|
||||||
// futures
|
// futures
|
||||||
let config_fut = init_config_grub_mechanism(&tx);
|
// todo : rewrite with spawn
|
||||||
let grub_fut = init_api_grub_mechanism(config, &mut rx);
|
// let config_fut = init_config_grub_mechanism(&tx);
|
||||||
|
// let grub_fut = init_api_grub_mechanism(config, &mut rx);
|
||||||
|
|
||||||
let _ = tokio::join!(config_fut, grub_fut);
|
let event_config = tokio::spawn(async move {
|
||||||
|
match init_config_grub_mechanism(&tx).await {
|
||||||
|
Ok(_) => {
|
||||||
|
info!("Config task deinitialized");
|
||||||
|
},
|
||||||
|
Err(er) => {
|
||||||
|
error!("Config task returned an error : {}", er);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let event_grub = tokio::spawn(async move {
|
||||||
|
if std::env::var("ENODE_MONITORING_IP").is_ok() {
|
||||||
|
match get_metrics_from_monitoring(0, 5).await {
|
||||||
|
Ok(_) => {
|
||||||
|
info!("Grabing (eNODE.Monitoring) task deinitialized");
|
||||||
|
},
|
||||||
|
Err(er) => {
|
||||||
|
error!("Grabing task returned an error : {}", er);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
match init_api_grub_mechanism(config, &mut rx).await {
|
||||||
|
Ok(_) => {
|
||||||
|
info!("Grabing task deinitialized");
|
||||||
|
},
|
||||||
|
Err(er) => {
|
||||||
|
error!("Grabing task returned an error : {}", er);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let events_handler = vec![event_config, event_grub];
|
||||||
|
for event in events_handler {
|
||||||
|
let _ = event.await;
|
||||||
|
}
|
||||||
|
// let _ = tokio::join!(config_fut, grub_fut);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_config() -> ApiConfig {
|
async fn get_config() -> Config {
|
||||||
return match pull_local_config().await {
|
return match pull_local_config().await {
|
||||||
Ok(conf) => {
|
Ok(conf) => {
|
||||||
info!("Local config was loaded");
|
info!("Local config was loaded");
|
||||||
|
|
@ -37,7 +76,7 @@ async fn get_config() -> ApiConfig {
|
||||||
},
|
},
|
||||||
Err(er) => {
|
Err(er) => {
|
||||||
warn!("Cannot get local config due to {}", er);
|
warn!("Cannot get local config due to {}", er);
|
||||||
ApiConfig::default()
|
Config::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -0,0 +1,254 @@
|
||||||
|
use std::env;
|
||||||
|
use anyhow::Error;
|
||||||
|
use serde_json::{Map, Value};
|
||||||
|
use reqwest::Client;
|
||||||
|
use tokio::sync::Semaphore;
|
||||||
|
use std::sync::Arc;
|
||||||
|
// use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl};
|
||||||
|
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size};
|
||||||
|
// use crate::structs::cmdb::Query;
|
||||||
|
use integr_structs::api::enode_monitoring::cmdb::Query;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
// use crate::structs::get_chunk_size;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::future::Future;
|
||||||
|
use integr_structs::api::v3::{MetricOutput, PrometheusMetrics};
|
||||||
|
use log::{error, info, warn};
|
||||||
|
// use chrono::{Local, DateTime};
|
||||||
|
|
||||||
|
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
|
||||||
|
let timer = tokio::time::Instant::now();
|
||||||
|
'outer: loop {
|
||||||
|
let mut a = MonitoringImporter::new().await;
|
||||||
|
a.start_session().await?;
|
||||||
|
'inner: loop {
|
||||||
|
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
|
||||||
|
break 'outer;
|
||||||
|
}
|
||||||
|
let vec = a.get_metrics_list().await.unwrap_or_else(|_| vec![]);
|
||||||
|
if vec.is_empty() {
|
||||||
|
warn!("Session dropped, creating new ...");
|
||||||
|
break 'inner;
|
||||||
|
}
|
||||||
|
let _ = a.get_measure_info(Arc::new(vec)).await;
|
||||||
|
// a.close_session().await?;
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct MonitoringImporter {
|
||||||
|
ip : String,
|
||||||
|
login : String,
|
||||||
|
password : String,
|
||||||
|
access_token : String,
|
||||||
|
ts : String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MonitoringImporter {
|
||||||
|
pub async fn new() -> Self {
|
||||||
|
MonitoringImporter {
|
||||||
|
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
|
||||||
|
login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()),
|
||||||
|
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
|
||||||
|
access_token : String::new(),
|
||||||
|
ts : String::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn is_valid(&self) -> bool {
|
||||||
|
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
|
||||||
|
}
|
||||||
|
async fn set_ts(&mut self, ts: &str) {
|
||||||
|
self.ts = ts.to_owned();
|
||||||
|
}
|
||||||
|
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
||||||
|
if !self.is_valid().await {
|
||||||
|
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
||||||
|
}
|
||||||
|
let client = Client::new();
|
||||||
|
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
||||||
|
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
||||||
|
// dbg!(&fortoken);
|
||||||
|
let client = client
|
||||||
|
.post(url)
|
||||||
|
.header("Content-Type", "application/json")
|
||||||
|
.json(&fortoken);
|
||||||
|
let resp = client.send().await?;
|
||||||
|
let auth = resp.json::<AuthResponse>().await?;
|
||||||
|
// dbg!(&auth);
|
||||||
|
self.set_ts(&fortoken.ts).await;
|
||||||
|
|
||||||
|
self.access_token = auth.access_token.to_owned();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub async fn close_session(&mut self) -> anyhow::Result<()> {
|
||||||
|
let client = Client::new();
|
||||||
|
let url = format!("http://{}/e-data-front/auth/logout", self.ip);
|
||||||
|
let client = client
|
||||||
|
.post(url)
|
||||||
|
.header("Content-Type", "application/json")
|
||||||
|
.header("access-token", &self.access_token);
|
||||||
|
|
||||||
|
let _ = client.send().await?;
|
||||||
|
|
||||||
|
self.access_token.clear();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<String>> {
|
||||||
|
let client = Client::new();
|
||||||
|
let mut vec: Vec<String> = Vec::new();
|
||||||
|
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
||||||
|
let client = client
|
||||||
|
.post(url)
|
||||||
|
.header("Content-Type", "application/json")
|
||||||
|
.header("access-token", &self.access_token)
|
||||||
|
.json(&Query::default());
|
||||||
|
let resp = client.send().await?.text().await?;
|
||||||
|
// dbg!(&resp.text().await);
|
||||||
|
let resp: Value = serde_json::from_str(&resp)?;
|
||||||
|
if let Some(arr) = resp.as_array() {
|
||||||
|
for measure in arr {
|
||||||
|
let id = measure.get("id");
|
||||||
|
let cls = measure.get("cls");
|
||||||
|
if id.is_some() && cls.is_some() {
|
||||||
|
// todo: later wait for Vaitaliy call of classification
|
||||||
|
let id = id.unwrap().as_i64().unwrap_or_default();
|
||||||
|
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
||||||
|
if cls.is_empty() {
|
||||||
|
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
|
||||||
|
}
|
||||||
|
// let measure_name = format!("{}${}", cls, id);
|
||||||
|
vec.push(format!("{}${}", cls, id));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// dbg!(vec);
|
||||||
|
} else {
|
||||||
|
return Err(Error::msg("Invalid JSON in response"));
|
||||||
|
}
|
||||||
|
Ok(vec)
|
||||||
|
}
|
||||||
|
pub async fn get_measure_info(&self, measures: Arc<Vec<String>>) -> anyhow::Result<()> {
|
||||||
|
let mut sys = sysinfo::System::new();
|
||||||
|
sys.refresh_cpu_all();
|
||||||
|
// adaptive permition on task spawm to prevent system overload
|
||||||
|
let sem = Arc::new(Semaphore::new(sys.cpus().len()));
|
||||||
|
let mut jh_vec = Vec::new();
|
||||||
|
let client = Arc::new(Client::new());
|
||||||
|
let measures = measures.clone();
|
||||||
|
let arc = Arc::new(self.clone());
|
||||||
|
// dbg!(&measures.display());
|
||||||
|
|
||||||
|
// dbg!(&measures.len());
|
||||||
|
for measure in measures.chunks(get_chunk_size(measures.len())) {
|
||||||
|
let permit = sem.clone();
|
||||||
|
let arc = arc.clone();
|
||||||
|
let client = client.clone();
|
||||||
|
let measure = Arc::new(measure.display());
|
||||||
|
|
||||||
|
let _permit = permit.acquire().await.unwrap();
|
||||||
|
|
||||||
|
let jh: JoinHandle<anyhow::Result<PrometheusMetrics>> = tokio::spawn(async move {
|
||||||
|
Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await
|
||||||
|
|
||||||
|
});
|
||||||
|
jh_vec.push(jh);
|
||||||
|
}
|
||||||
|
// let mut vals = Vec::new();
|
||||||
|
for event in jh_vec {
|
||||||
|
match event.await {
|
||||||
|
Ok(val) => {
|
||||||
|
if let Ok(val) = val {
|
||||||
|
match crate::export::Exporter::export_metrics(val).await {
|
||||||
|
Ok(bytes) => info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes),
|
||||||
|
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
|
||||||
|
}
|
||||||
|
// vals.push(val);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(er) => println!("Fatal error on async task: {}", er),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// dbg!(&vals);
|
||||||
|
// dbg!(&vals.len());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>) -> anyhow::Result<PrometheusMetrics> {
|
||||||
|
let resp = client
|
||||||
|
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
||||||
|
.header("Content-Type", "application/json")
|
||||||
|
.header("access-token", &arc.access_token)
|
||||||
|
.send().await?
|
||||||
|
.text().await?;
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
|
let resp: Value = serde_json::from_str(&resp)?;
|
||||||
|
// let a = Self::extract_metric_data(resp);
|
||||||
|
|
||||||
|
Ok(
|
||||||
|
PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await
|
||||||
|
)
|
||||||
|
}
|
||||||
|
fn extract_metric_data(json: Value) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutput>>> + Send>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
return match json {
|
||||||
|
Value::Object(obj) => {
|
||||||
|
// let resp: Value = serde_json::from_str(&obj)?;
|
||||||
|
return Ok(vec![Self::process_value(&obj).await?])
|
||||||
|
},
|
||||||
|
Value::Array(arr) => {
|
||||||
|
let mut vec = Vec::new();
|
||||||
|
for obj in arr {
|
||||||
|
if let Ok(mut val) = Self::extract_metric_data(obj).await {
|
||||||
|
// vec.push(val);
|
||||||
|
vec.append(&mut val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Ok(vec)
|
||||||
|
},
|
||||||
|
_ => Err(Error::msg("Invalid JSON format")),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
async fn process_value(obj : &Map<String, Value>) -> anyhow::Result<MetricOutput> {
|
||||||
|
let id = obj.get("id");
|
||||||
|
let val = obj.get("value");
|
||||||
|
|
||||||
|
if id.is_none() || val.is_none() {
|
||||||
|
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
|
||||||
|
}
|
||||||
|
let id = id.unwrap().as_str().unwrap_or_else(|| "");
|
||||||
|
let val = val.unwrap();
|
||||||
|
|
||||||
|
if id.is_empty() {
|
||||||
|
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
||||||
|
}
|
||||||
|
// pub struct MetricOutput {
|
||||||
|
// pub id : String,
|
||||||
|
// #[serde(rename = "type")]
|
||||||
|
// json_type : String,
|
||||||
|
// addr : String,
|
||||||
|
// pub value : Value,
|
||||||
|
// }
|
||||||
|
|
||||||
|
Ok(MetricOutput {
|
||||||
|
id : id.to_owned(),
|
||||||
|
json_type : match val {
|
||||||
|
Value::Number(val) => {
|
||||||
|
if val.is_i64() {
|
||||||
|
"i64".to_owned()
|
||||||
|
} else if val.is_u64() {
|
||||||
|
"u64".to_owned()
|
||||||
|
} else {
|
||||||
|
"f64".to_owned()
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => "unknown".to_owned(),
|
||||||
|
},
|
||||||
|
addr : "enode.monitoring.api".to_owned(),
|
||||||
|
value : val.clone()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,82 +1,35 @@
|
||||||
// module to handle unix-socket connection + pulling info from api
|
// module to handle unix-socket connection + pulling info from api
|
||||||
use anyhow::{Error, Result};
|
use anyhow::Result;
|
||||||
use integr_structs::api::ApiConfig;
|
// use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
|
use rand::random;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use reqwest::{Client, Method};
|
use reqwest::{Client, Method};
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
// use tokio::sync::Mutex;
|
||||||
|
use dotenv::dotenv;
|
||||||
|
use crate::json::JsonParser;
|
||||||
|
use crate::export::Exporter;
|
||||||
|
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
|
||||||
|
// use md5::compute;
|
||||||
|
|
||||||
struct RestMethod;
|
// type BufferType = Arc<Mutex<Vec<String>>>;
|
||||||
|
|
||||||
impl RestMethod {
|
|
||||||
pub async fn from_str(method: &str) -> Method {
|
|
||||||
return match method.trim().to_lowercase().as_str() {
|
|
||||||
"post" => Method::POST,
|
|
||||||
"patch" => Method::PATCH,
|
|
||||||
"put" => Method::PUT,
|
|
||||||
"delete" => Method::DELETE,
|
|
||||||
"get" | _ => Method::GET
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
struct ApiPoll<'a> {
|
|
||||||
config : &'a mut ApiConfig,
|
|
||||||
client : Client,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> ApiPoll<'a> {
|
|
||||||
pub async fn new(poll_cfg : &'a mut ApiConfig) -> Self {
|
|
||||||
Self {
|
|
||||||
config : poll_cfg,
|
|
||||||
client : Client::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// can be weak and with bug test needed
|
|
||||||
pub async fn change_config(&mut self, conf: ApiConfig) {
|
|
||||||
*self.config = conf;
|
|
||||||
}
|
|
||||||
pub async fn is_default(&self) -> bool {
|
|
||||||
self.config.endpoints.len() == 0
|
|
||||||
}
|
|
||||||
pub async fn process_polling(&self) -> Result<Vec<String>> {
|
|
||||||
let mut buffer: Vec<String> = vec![];
|
|
||||||
// TODO: rewrite nextly to async
|
|
||||||
for point in &self.config.endpoints {
|
|
||||||
// let a = self.client.get(&point.url).send().await.unwrap();
|
|
||||||
// a.text().await.unwrap();
|
|
||||||
match self.client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
|
|
||||||
Ok(resp) => {
|
|
||||||
if !resp.status().is_success() {
|
|
||||||
error!("ErrorCode in Response from API. Check configuration");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if let Ok(text) = resp.text().await {
|
|
||||||
info!("{}: {} - Successfull grubbing info", &point.method.to_uppercase(), &point.url);
|
|
||||||
buffer.push(text);
|
|
||||||
} else {
|
|
||||||
error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(er) => {
|
|
||||||
error!("{}: {} - Query crushed due to {}", &point.method.to_uppercase(), &point.url, er);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
match &buffer.len() {
|
|
||||||
0 => Err(Error::msg("Error due to API grubbing. Check config" )),
|
|
||||||
_ => Ok(buffer),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub async fn get_delay(&self) -> u32 {
|
|
||||||
self.config.delay
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// for api info pulling
|
// for api info pulling
|
||||||
pub async fn init_api_grub_mechanism(config: ApiConfig, rx: &mut Receiver<ApiConfig>) -> Result<()> {
|
pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> {
|
||||||
info!("Initializing API-info grubbing mechanism...");
|
info!("Initializing API-info grubbing mechanism :");
|
||||||
|
info!("1) Loading vars from .env file if exists...");
|
||||||
|
let _ = dotenv().ok();
|
||||||
|
|
||||||
let mut config = config;
|
let mut config = config;
|
||||||
let mut poller = ApiPoll::new(&mut config).await;
|
let mut poller = ApiPoll::new(&mut config).await;
|
||||||
|
info!("2) Api-Poller has initialized");
|
||||||
|
let client = Exporter::init();
|
||||||
|
info!("3) Exporter has initialized");
|
||||||
|
let shared_pool = Arc::new(client);
|
||||||
loop {
|
loop {
|
||||||
if poller.is_default().await {
|
if poller.is_default().await {
|
||||||
sleep(Duration::from_secs(5)).await;
|
sleep(Duration::from_secs(5)).await;
|
||||||
|
|
@ -87,63 +40,256 @@ pub async fn init_api_grub_mechanism(config: ApiConfig, rx: &mut Receiver<ApiCon
|
||||||
info!("Config changed");
|
info!("Config changed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("Data from API: {:?}", poller.process_polling().await);
|
let shared_pool = shared_pool.clone();
|
||||||
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
|
||||||
|
// sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Ok(())
|
// Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod net_unittests {
|
|
||||||
use super::*;
|
|
||||||
use tokio::test;
|
|
||||||
use integr_structs::api::{ApiConfig, ApiEndpoint};
|
|
||||||
|
|
||||||
#[test]
|
struct RestMethod;
|
||||||
async fn check_str_to_rest_method() {
|
|
||||||
assert_eq!(RestMethod::from_str("get").await, Method::GET);
|
|
||||||
assert_eq!(RestMethod::from_str("post").await, Method::POST);
|
|
||||||
assert_eq!(RestMethod::from_str("patch").await, Method::PATCH);
|
|
||||||
assert_eq!(RestMethod::from_str("put").await, Method::PUT);
|
|
||||||
assert_eq!(RestMethod::from_str("delete").await, Method::DELETE);
|
|
||||||
assert_eq!(RestMethod::from_str("invalid_method").await, Method::GET);
|
|
||||||
}
|
|
||||||
#[test]
|
|
||||||
async fn check_api_poll_change_config() {
|
|
||||||
let mut conf1 = ApiConfig::default();
|
|
||||||
let conf2 = ApiConfig { endpoints : vec![], delay : 10, };
|
|
||||||
let mut poll = ApiPoll::new(&mut conf1).await;
|
|
||||||
poll.change_config(conf2).await;
|
|
||||||
assert_eq!(poll.config.delay, 10)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
impl RestMethod {
|
||||||
async fn check_api_poll_is_default() {
|
pub async fn from_str(method: &str) -> Method {
|
||||||
let mut conf1 = ApiConfig::default();
|
return match method.trim().to_lowercase().as_str() {
|
||||||
let poll = ApiPoll::new(&mut conf1).await;
|
"post" => Method::POST,
|
||||||
assert!(poll.is_default().await)
|
"patch" => Method::PATCH,
|
||||||
}
|
"put" => Method::PUT,
|
||||||
|
"delete" => Method::DELETE,
|
||||||
#[test]
|
"head" => Method::HEAD,
|
||||||
async fn check_api_grubbing_mechanism_on_public_one() {
|
"trace" => Method::TRACE,
|
||||||
use log::{set_max_level, LevelFilter};
|
"options" => Method::OPTIONS,
|
||||||
|
"connect" => Method::CONNECT,
|
||||||
set_max_level(LevelFilter::Off);
|
"get" | _ => Method::GET
|
||||||
let mut conf1 = ApiConfig {
|
}
|
||||||
endpoints : vec![
|
|
||||||
ApiEndpoint {
|
|
||||||
url : String::from("https://dummy-json.mock.beeceptor.com/countries"),
|
|
||||||
method: String::from("get"),
|
|
||||||
}],
|
|
||||||
delay : 10,
|
|
||||||
};
|
|
||||||
let conf2 = ApiConfig::default();
|
|
||||||
|
|
||||||
let mut poll = ApiPoll::new(&mut conf1).await;
|
|
||||||
assert!(poll.process_polling().await.is_ok());
|
|
||||||
|
|
||||||
poll.change_config(conf2).await;
|
|
||||||
assert!(poll.process_polling().await.is_err());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
struct ApiPoll<'a> {
|
||||||
|
config : &'a mut Config,
|
||||||
|
client : Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ApiPoll<'a> {
|
||||||
|
pub async fn new(poll_cfg : &'a mut Config) -> Self {
|
||||||
|
Self {
|
||||||
|
config : poll_cfg,
|
||||||
|
client : Client::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// can be weak and with bug test needed
|
||||||
|
pub async fn change_config(&mut self, conf: Config) {
|
||||||
|
*self.config = conf;
|
||||||
|
}
|
||||||
|
pub async fn is_default(&self) -> bool {
|
||||||
|
self.config.is_default().await
|
||||||
|
}
|
||||||
|
// pub async fn get_delay(&self) -> u32 {
|
||||||
|
// self.config.timeout
|
||||||
|
// }
|
||||||
|
pub async fn process_metrics(
|
||||||
|
service_id: Arc<String>,
|
||||||
|
metrics: Arc<Metrics>,
|
||||||
|
creds: Credentials,
|
||||||
|
// exporter: Arc<Exporter>
|
||||||
|
) -> Result<()> {
|
||||||
|
// processing metrics
|
||||||
|
// let mut req = Client::new()
|
||||||
|
// // .user_agent("api_grub/integration_module")
|
||||||
|
// .get(&metrics.url);
|
||||||
|
use std::hash::DefaultHasher;
|
||||||
|
|
||||||
|
let rand = random::<char>();
|
||||||
|
let mut hash = DefaultHasher::new();
|
||||||
|
rand.hash(&mut hash);
|
||||||
|
|
||||||
|
let client = Client::builder()
|
||||||
|
.user_agent(format!("api-grabber-{}", hash.finish()));
|
||||||
|
let mut req = client.build().unwrap().get(&metrics.url);
|
||||||
|
|
||||||
|
let login = &creds.endpoint.login;
|
||||||
|
let password = &creds.endpoint.password;
|
||||||
|
let api_key = &creds.endpoint.api_key;
|
||||||
|
|
||||||
|
if !login.is_empty() && !password.is_empty() {
|
||||||
|
// dbg!("kjgbkasgksjd");
|
||||||
|
req = req.basic_auth(login, Some(password));
|
||||||
|
}
|
||||||
|
if !api_key.is_empty() {
|
||||||
|
// req = req.bearer_auth(&api_key);
|
||||||
|
// req = req.header("authorization", "bearer ");
|
||||||
|
|
||||||
|
req = req.header("accept", "application/json");
|
||||||
|
req = req.header("x-api-key", api_key);
|
||||||
|
|
||||||
|
// req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"])
|
||||||
|
}
|
||||||
|
// dbg!(&req);
|
||||||
|
// let (client, res) = req.build_split();
|
||||||
|
// let res = res.unwrap();
|
||||||
|
// res.url_mut().is_special()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// dbg!(client);
|
||||||
|
// dbg!(res);
|
||||||
|
// todo!();
|
||||||
|
|
||||||
|
match req.send().await {
|
||||||
|
Ok(resp) => {
|
||||||
|
// dbg!(&resp.text().await);
|
||||||
|
if let Ok(response) = resp.text().await {
|
||||||
|
match serde_json::to_value(&response) {
|
||||||
|
Err(er) => {
|
||||||
|
error!("Bad JSON in response. Error: {}", er);
|
||||||
|
},
|
||||||
|
Ok(_) => {
|
||||||
|
let endpoint_name = &metrics.name;
|
||||||
|
let preproc = JsonParser::parse(&metrics.measure, &response);
|
||||||
|
// dbg!(serde_json::to_string_pretty(&preproc));
|
||||||
|
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
|
||||||
|
match Exporter::export_metrics(preproc).await {
|
||||||
|
Ok(bytes) => {
|
||||||
|
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);
|
||||||
|
},
|
||||||
|
Err(er) => {
|
||||||
|
error!("Failed to export data to Prometheus due to {}", er);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("Bad response from {}. No data", &metrics.url);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(er) => {
|
||||||
|
error!("Cannot API data from {} due to : {}", &metrics.url, er);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub async fn process_endpoint(
|
||||||
|
// client : Arc<Client>,
|
||||||
|
config : Arc<ConfigEndpoint>,
|
||||||
|
creds : Credentials,
|
||||||
|
// exporter : Arc<Exporter>
|
||||||
|
) -> Result<()> {
|
||||||
|
//
|
||||||
|
let period = config.get_period().unwrap_or(0);
|
||||||
|
let timeout = config.get_timeout().unwrap_or(5);
|
||||||
|
let metrics = Arc::new(config.metrics.clone());
|
||||||
|
let service_id = Arc::new(config.id.clone());
|
||||||
|
loop {
|
||||||
|
// let exporter = exporter.clone();
|
||||||
|
let creds = creds.clone();
|
||||||
|
let metrics = metrics.clone();
|
||||||
|
let service_id = service_id.clone();
|
||||||
|
let mut jh = Vec::<JoinHandle::<Result<()>>>::new();
|
||||||
|
|
||||||
|
for idx in 0..metrics.len() {
|
||||||
|
let creds = creds.clone();
|
||||||
|
let metrics = metrics.clone();
|
||||||
|
let service_id = service_id.clone();
|
||||||
|
let event = tokio::spawn(async move {
|
||||||
|
Self::process_metrics(
|
||||||
|
service_id.clone(),
|
||||||
|
metrics[idx].clone().into(),
|
||||||
|
creds.clone(),
|
||||||
|
).await
|
||||||
|
});
|
||||||
|
jh.push(event);
|
||||||
|
}
|
||||||
|
info!("Initializing another {} subjob(s) for `{}` service",
|
||||||
|
jh.len(),
|
||||||
|
&service_id
|
||||||
|
);
|
||||||
|
|
||||||
|
for i in jh {
|
||||||
|
let _ = i.await;
|
||||||
|
}
|
||||||
|
// processing
|
||||||
|
sleep(Duration::from_secs(timeout)).await
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
|
||||||
|
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
|
||||||
|
// let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||||
|
// let client = Arc::new(self.client.clone());
|
||||||
|
let config = Arc::new(self.config.clone());
|
||||||
|
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
|
||||||
|
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||||
|
|
||||||
|
for (idx, _) in config.config.iter().enumerate() {
|
||||||
|
// let for_creds = endpoints[idx].clone();
|
||||||
|
let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
|
||||||
|
let endpoint = endpoints[idx].clone();
|
||||||
|
// let client = client.clone();
|
||||||
|
let exporter = exporter.clone();
|
||||||
|
let join_handler = tokio::spawn(async move {
|
||||||
|
Self::process_endpoint(
|
||||||
|
// client,
|
||||||
|
endpoint,
|
||||||
|
creds,
|
||||||
|
// exporter.clone()
|
||||||
|
).await
|
||||||
|
});
|
||||||
|
join_handles.push(join_handler);
|
||||||
|
}
|
||||||
|
info!("Initializing {} task(s) for current config", join_handles.len());
|
||||||
|
for i in join_handles {
|
||||||
|
let _ = i.await;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// #[cfg(test)]
|
||||||
|
// mod net_unittests {
|
||||||
|
// use super::*;
|
||||||
|
// use tokio::test;
|
||||||
|
|
||||||
|
// #[test]
|
||||||
|
// async fn check_str_to_rest_method() {
|
||||||
|
// assert_eq!(RestMethod::from_str("get").await, Method::GET);
|
||||||
|
// assert_eq!(RestMethod::from_str("post").await, Method::POST);
|
||||||
|
// assert_eq!(RestMethod::from_str("patch").await, Method::PATCH);
|
||||||
|
// assert_eq!(RestMethod::from_str("put").await, Method::PUT);
|
||||||
|
// assert_eq!(RestMethod::from_str("delete").await, Method::DELETE);
|
||||||
|
// assert_eq!(RestMethod::from_str("invalid_method").await, Method::GET);
|
||||||
|
// }
|
||||||
|
// #[test]
|
||||||
|
// async fn check_api_poll_change_config() {
|
||||||
|
// let mut conf1 = ApiConfigV2::default();
|
||||||
|
// let conf2 = ApiConfigV2::pattern();
|
||||||
|
// let mut poll = ApiPoll::new(&mut conf1).await;
|
||||||
|
// poll.change_config(conf2).await;
|
||||||
|
// assert_eq!(poll.config.timeout, 1)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// #[test]
|
||||||
|
// async fn check_api_poll_is_default() {
|
||||||
|
// let mut conf1 = ApiConfigV2::default();
|
||||||
|
// let poll = ApiPoll::new(&mut conf1).await;
|
||||||
|
// assert!(poll.is_default().await)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// #[test]
|
||||||
|
// async fn check_api_grubbing_mechanism_on_public_one() {
|
||||||
|
// use log::{set_max_level, LevelFilter};
|
||||||
|
|
||||||
|
// set_max_level(LevelFilter::Off);
|
||||||
|
// let mut conf1 = ApiConfigV2::pattern();
|
||||||
|
// let conf2 = ApiConfigV2::default();
|
||||||
|
// let exporter = Arc::new(Exporter::init());
|
||||||
|
|
||||||
|
// let mut poll = ApiPoll::new(&mut conf1).await;
|
||||||
|
// assert!(poll.process_polling(exporter.clone()).await.is_ok());
|
||||||
|
|
||||||
|
// dbg!(&poll.config);
|
||||||
|
// poll.change_config(conf2).await;
|
||||||
|
// dbg!(&poll.config);
|
||||||
|
// assert!(poll.process_polling(exporter.clone()).await.is_err());
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "config-delivery"
|
|
||||||
version = "0.3.4"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
dotenv = "0.15.0"
|
|
||||||
rand = "0.8.5"
|
|
||||||
serde = { version = "1.0.217", features = ["derive"] }
|
|
||||||
serde_json = "1.0.135"
|
|
||||||
tokio = { version = "1.43.0", features = ["full"] }
|
|
||||||
tokio-websockets = { version = "^0.11.0", features = ["client", "openssl", "rand"] }
|
|
||||||
integr-structs = {path = "../integr-structs"}
|
|
||||||
anyhow = "1.0.95"
|
|
||||||
env_logger = "0.11.6"
|
|
||||||
log = "0.4.25"
|
|
||||||
chrono = "0.4.39"
|
|
||||||
|
|
@ -1,43 +0,0 @@
|
||||||
// mod to communicate with api-grub and preproc services
|
|
||||||
// using Unix-Socket Client
|
|
||||||
|
|
||||||
use anyhow::{Error, Result};
|
|
||||||
use integr_structs::api::ApiConfig;
|
|
||||||
use tokio::time::{sleep, Instant};
|
|
||||||
use tokio::net::UnixStream;
|
|
||||||
use std::env;
|
|
||||||
|
|
||||||
enum UnixSocketConsumer {
|
|
||||||
ApiGrubber,
|
|
||||||
Preproc,
|
|
||||||
}
|
|
||||||
// to create us-client
|
|
||||||
struct UnixSocketClient;
|
|
||||||
|
|
||||||
impl UnixSocketConsumer {
|
|
||||||
pub async fn get_stream_object(&self) -> Result<UnixStream> {
|
|
||||||
let socket_file = match self {
|
|
||||||
UnixSocketConsumer::ApiGrubber => env::var("API_GRUBBER_SOCKET")?,
|
|
||||||
UnixSocketConsumer::Preproc => env::var("PREPROC_SOCKET")?,
|
|
||||||
};
|
|
||||||
UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg("Cannot create Unix-Socket client")))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn check_unix_socket_file(path: &str) -> bool {
|
|
||||||
std::path::Path::new(path).exists()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(tests)]
|
|
||||||
mod delivery_unittests {
|
|
||||||
use super::*;
|
|
||||||
use tokio::test;
|
|
||||||
|
|
||||||
//
|
|
||||||
#[test]
|
|
||||||
async fn check_api_unix_socket_client_creation() { assert!(true); }
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
async fn check_preproc_unix_socket_client_creation() { assert!(true); }
|
|
||||||
//
|
|
||||||
}
|
|
||||||
|
|
@ -1,24 +0,0 @@
|
||||||
mod delivery;
|
|
||||||
mod integration;
|
|
||||||
mod logger;
|
|
||||||
|
|
||||||
use logger::setup_logger;
|
|
||||||
use dotenv::dotenv;
|
|
||||||
use anyhow::Result;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use log::info;
|
|
||||||
|
|
||||||
// Arch :
|
|
||||||
// 1) 2 Unix-Socket client (for api grub and preproc) :: i think its a continious process for events when services are unavailable
|
|
||||||
// 2) mpsc beetween `delivery` and `integration` ::
|
|
||||||
// 3) websocket client in `integration` to pull configs from Monitoring System ::
|
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
|
||||||
async fn main() -> Result<()> {
|
|
||||||
let _ = setup_logger().await?;
|
|
||||||
dotenv().ok();
|
|
||||||
info!("Pulling env vars from .env file if exists ...");
|
|
||||||
println!("Hello, world!");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
@ -4,5 +4,7 @@ version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow = "1.0.95"
|
||||||
|
chrono = "0.4.40"
|
||||||
serde = { version = "1.0.217", features = ["derive"] }
|
serde = { version = "1.0.217", features = ["derive"] }
|
||||||
serde_json = "1.0.135"
|
serde_json = "1.0.135"
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,10 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
|
use serde_json::{ to_string_pretty, Value };
|
||||||
|
use anyhow::Result;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::fmt::Display;
|
||||||
|
use chrono::{DateTime, Local};
|
||||||
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
|
@ -22,3 +28,390 @@ impl Default for ApiConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// v2
|
||||||
|
#[derive(Serialize, Deserialize, Debug, PartialEq)]
|
||||||
|
pub struct ApiConfigV2 {
|
||||||
|
pub id : u64,
|
||||||
|
#[serde(default)]
|
||||||
|
pub template : Vec<Template>,
|
||||||
|
pub ip_address : String,
|
||||||
|
pub login : Option<String>,
|
||||||
|
pub pass : Option<String>,
|
||||||
|
pub api_key : Option<String>,
|
||||||
|
pub period : u32, // if "0" -> inf
|
||||||
|
pub timeout : u32, // if "0" -> no-delay
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ApiConfigV2 {
|
||||||
|
fn default() -> Self {
|
||||||
|
ApiConfigV2 {
|
||||||
|
id : 0,
|
||||||
|
template : Vec::new(),
|
||||||
|
ip_address : String::from("no_ip"),
|
||||||
|
login : None,
|
||||||
|
pass : None,
|
||||||
|
api_key : None,
|
||||||
|
period : 0,
|
||||||
|
timeout : 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl ApiConfigV2 {
|
||||||
|
pub fn template() -> Self {
|
||||||
|
ApiConfigV2 {
|
||||||
|
id : 1111,
|
||||||
|
template : Vec::new(),
|
||||||
|
ip_address : String::from("ip"),
|
||||||
|
login : None,
|
||||||
|
pass : None,
|
||||||
|
api_key : None,
|
||||||
|
period : 1111,
|
||||||
|
timeout : 1111,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn pattern() -> Self {
|
||||||
|
ApiConfigV2 {
|
||||||
|
id : 1111,
|
||||||
|
template : vec![
|
||||||
|
Template {
|
||||||
|
id : String::from("no id"),
|
||||||
|
name : String::from("open api"),
|
||||||
|
url : String::from("https://dummy-json.mock.beeceptor.com/countries"),
|
||||||
|
method : String::from("GET"),
|
||||||
|
measure : Vec::new(),
|
||||||
|
}
|
||||||
|
],
|
||||||
|
ip_address : String::from("ip"),
|
||||||
|
login : None,
|
||||||
|
pass : None,
|
||||||
|
api_key : None,
|
||||||
|
period : 1,
|
||||||
|
timeout : 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
|
pub struct Template {
|
||||||
|
pub id : String,
|
||||||
|
pub name : String,
|
||||||
|
pub url : String,
|
||||||
|
pub method : String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub measure : Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Template {
|
||||||
|
fn default() -> Self {
|
||||||
|
Template {
|
||||||
|
id : String::from("no-id"),
|
||||||
|
name : String::from("no-name"),
|
||||||
|
url : String::from("no-url"),
|
||||||
|
method : String::from("post"),
|
||||||
|
measure : Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct ProcessedEndpoint {
|
||||||
|
id : String,
|
||||||
|
name : String,
|
||||||
|
url : String,
|
||||||
|
method : String,
|
||||||
|
#[serde(default)]
|
||||||
|
metrics : HashMap<String, Value>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProcessedEndpoint {
|
||||||
|
pub fn new(id: &str, name: &str, url: &str, method: &str, metrics: HashMap<String, Value>) -> Self {
|
||||||
|
ProcessedEndpoint {
|
||||||
|
id : id.to_owned(),
|
||||||
|
name : name.to_owned(),
|
||||||
|
url : url.to_owned(),
|
||||||
|
method : method.to_owned(),
|
||||||
|
metrics : metrics,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn from_target_response(response: &str, keys: &Template) -> Result<String> {
|
||||||
|
let mut hm: HashMap<String, Value> = HashMap::new();
|
||||||
|
let mut response: Value = serde_json::from_str(response)?;
|
||||||
|
|
||||||
|
let _ = keys.measure.iter()
|
||||||
|
.map(|key| (key, response[key].take()))
|
||||||
|
.for_each(|(key, value)| {
|
||||||
|
hm.insert(key.clone(), value);
|
||||||
|
});
|
||||||
|
let val = ProcessedEndpoint::new(&keys.id, &keys.name, &keys.url, &keys.method,hm);
|
||||||
|
Ok(to_string_pretty(&val)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod v3 {
|
||||||
|
pub use super::*;
|
||||||
|
|
||||||
|
// in config
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
|
pub struct Metric {
|
||||||
|
pub id : String,
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub json_type : String,
|
||||||
|
pub addr : String,
|
||||||
|
}
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
|
pub struct Metrics {
|
||||||
|
pub name : String,
|
||||||
|
pub url : String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub measure : Vec<Metric>
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
|
pub struct ConfigEndpoint {
|
||||||
|
pub id : String,
|
||||||
|
pub login : String,
|
||||||
|
#[serde(rename = "pass")]
|
||||||
|
pub password : String,
|
||||||
|
pub api_key : String,
|
||||||
|
period : String,
|
||||||
|
timeout : String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub metrics : Vec<Metrics>,
|
||||||
|
}
|
||||||
|
impl ConfigEndpoint {
|
||||||
|
pub fn from_config(config: Arc<Config>) -> Vec<Arc<Self>> {
|
||||||
|
let mut result: Vec<Arc<ConfigEndpoint>> = Vec::new();
|
||||||
|
config.config
|
||||||
|
.iter()
|
||||||
|
.for_each(|el| {
|
||||||
|
result.push(Arc::new(el.clone()))
|
||||||
|
});
|
||||||
|
result
|
||||||
|
}
|
||||||
|
pub fn get_period(&self) -> Option<u32> {
|
||||||
|
self.period.parse().ok()
|
||||||
|
}
|
||||||
|
pub fn get_timeout(&self) -> Option<u64> {
|
||||||
|
self.timeout.parse().ok()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
|
pub struct Config {
|
||||||
|
pub config : Vec<ConfigEndpoint>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Config {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
config : Vec::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Config {
|
||||||
|
pub async fn is_default(&self) -> bool {
|
||||||
|
self.config.is_empty()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Credentials {
|
||||||
|
pub endpoint : Arc<ConfigEndpoint>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Credentials {
|
||||||
|
pub fn from_config_endpoint(endpoint: Arc<ConfigEndpoint>) -> Credentials {
|
||||||
|
Self { endpoint }
|
||||||
|
}
|
||||||
|
// pub fn clone(self) -> Self {
|
||||||
|
// Self {
|
||||||
|
// endpoint : self.endpoint.clone()
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
// to prometheus and nmns
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct MetricOutput {
|
||||||
|
pub id : String,
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub json_type : String,
|
||||||
|
pub addr : String,
|
||||||
|
pub value : Value,
|
||||||
|
}
|
||||||
|
impl MetricOutput {
|
||||||
|
pub fn new_with_slices(id : &str, json_type : &str, addr: &str,value : Value) -> Self {
|
||||||
|
MetricOutput {
|
||||||
|
id : id.to_string(),
|
||||||
|
json_type : json_type.to_string(),
|
||||||
|
addr : addr.to_string(),
|
||||||
|
value : value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct PrometheusMetrics {
|
||||||
|
pub service_name: String,
|
||||||
|
pub endpoint_name: String,
|
||||||
|
pub metrics: Vec<MetricOutput>,
|
||||||
|
}
|
||||||
|
impl PrometheusMetrics {
|
||||||
|
pub fn new(service: &str, endpoint: &str, metrics: Vec<MetricOutput>) -> Self {
|
||||||
|
Self {
|
||||||
|
service_name: service.to_string(),
|
||||||
|
endpoint_name: endpoint.to_string(),
|
||||||
|
metrics: metrics
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn new_zvks(metrics: Vec<MetricOutput>) -> Self {
|
||||||
|
Self {
|
||||||
|
service_name : "zvks".to_owned(),
|
||||||
|
endpoint_name : "apiforsnmp".to_owned(),
|
||||||
|
metrics : metrics,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn get_bytes_len(&self) -> usize {
|
||||||
|
let str_metrics = serde_json::to_vec(self).unwrap_or_else(
|
||||||
|
|_| Vec::new()
|
||||||
|
);
|
||||||
|
str_metrics.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub mod enode_monitoring {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct ForTokenCredentials {
|
||||||
|
login : String,
|
||||||
|
password : String,
|
||||||
|
pub ts : String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ForTokenCredentials {
|
||||||
|
pub fn new(login: &str, pass: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
login : login.to_owned(),
|
||||||
|
password : pass.to_owned(),
|
||||||
|
ts : format!("{}", DateTime::timestamp(&Local::now())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod cmdb {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct Query {
|
||||||
|
id : Vec<String>,
|
||||||
|
data : Data,
|
||||||
|
#[serde(rename = "postQuery")]
|
||||||
|
post_query : String,
|
||||||
|
#[serde(rename = "enableActions")]
|
||||||
|
enable_actions : bool,
|
||||||
|
ts : usize
|
||||||
|
}
|
||||||
|
impl Default for Query {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
id : vec!["/measures/device$18".to_owned()],
|
||||||
|
data : Data::default(),
|
||||||
|
post_query : "links".to_owned(),
|
||||||
|
enable_actions : false,
|
||||||
|
ts : 1740060679399
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct Data {
|
||||||
|
links : Links,
|
||||||
|
fields : Vec<String>,
|
||||||
|
}
|
||||||
|
impl Default for Data {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
links : Links::default(),
|
||||||
|
fields : vec![ "$id".to_owned(), "id".to_owned(), "cls".to_owned(), "name".to_owned()]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct Links {
|
||||||
|
flatten : bool,
|
||||||
|
filter : Filter,
|
||||||
|
}
|
||||||
|
impl Default for Links {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
flatten : true,
|
||||||
|
filter : Filter::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct Filter {
|
||||||
|
cls : String,
|
||||||
|
}
|
||||||
|
impl Default for Filter {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
cls : "measure".to_owned()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// "{\"access_token\":\"5BNQsmiGFQRNA651HeQxZekYgYUAWZ4e\",\"role\":\"administrator\",\"startRT\":\"2025-02-25T09:03:27.581Z\",\"login\":\"admin\",\"push_active\":null,\"$id\":\"systemuser$1\"}",
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct AuthResponse {
|
||||||
|
pub access_token : String,
|
||||||
|
// role : String,
|
||||||
|
// startRT : String,
|
||||||
|
// login : String,
|
||||||
|
// push_active : Value,
|
||||||
|
// #[serde(rename = "$id")]
|
||||||
|
// id : String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait GenericUrl {
|
||||||
|
fn display(&self) -> String;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> GenericUrl for [T]
|
||||||
|
where T : Display {
|
||||||
|
fn display(&self) -> String {
|
||||||
|
let mut vec: Vec<String> = Vec::new();
|
||||||
|
vec.push("%5B".to_owned());
|
||||||
|
self.iter()
|
||||||
|
.enumerate()
|
||||||
|
.for_each(|(id, val)| {
|
||||||
|
if id > 0 {
|
||||||
|
vec.push(",".to_owned());
|
||||||
|
}
|
||||||
|
vec.push(format!("%22{}%22", val));
|
||||||
|
});
|
||||||
|
vec.push("%5D".to_owned());
|
||||||
|
vec.concat()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_chunk_size(total_measures: usize) -> usize {
|
||||||
|
match total_measures {
|
||||||
|
0..=144 => total_measures,
|
||||||
|
145..=288 => total_measures / 4,
|
||||||
|
289..=432 => total_measures / 5,
|
||||||
|
433..=576 => total_measures / 6,
|
||||||
|
577..=720 => total_measures / 7,
|
||||||
|
721..=864 => total_measures / 8,
|
||||||
|
865..=1008 => total_measures / 9,
|
||||||
|
_ => total_measures / 10,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -4,3 +4,11 @@ version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow = "1.0.95"
|
||||||
|
chrono = "0.4.39"
|
||||||
|
dotenv = "0.15.0"
|
||||||
|
env_logger = "0.11.6"
|
||||||
|
log = "0.4.25"
|
||||||
|
serde = { version = "1.0.217", features = ["derive"] }
|
||||||
|
serde_json = "1.0.137"
|
||||||
|
tokio = { version = "1.43.0", features = ["full"] }
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
// mod for prpeproc config pulling and updating
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod config_unittests {
|
||||||
|
use tokio::test;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
async fn create_unix_socket_server() { assert!(true) }
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
async fn verify_on_valid_config() { assert!(true) }
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
async fn verify_on_invalid_config() { assert!(true) }
|
||||||
|
}
|
||||||
|
|
@ -1,3 +1,18 @@
|
||||||
fn main() {
|
mod config;
|
||||||
println!("Hello, world!");
|
mod transform;
|
||||||
|
mod logger;
|
||||||
|
|
||||||
|
use logger::setup_logger;
|
||||||
|
use dotenv::dotenv;
|
||||||
|
use anyhow::Result;
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
|
async fn main() -> Result<()>{
|
||||||
|
let _ = setup_logger().await?;
|
||||||
|
|
||||||
|
info!("Pulling env vars from .env file if exists ...");
|
||||||
|
dotenv().ok();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
// mod for preproccessing and transfering to the CM metrics data
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
{
|
||||||
|
"config": [
|
||||||
|
{
|
||||||
|
"id":"demo_vcs_vinteo_dev_api",
|
||||||
|
"login" : "",
|
||||||
|
"pass" : "",
|
||||||
|
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
||||||
|
"period" : "",
|
||||||
|
"timeout" : "3",
|
||||||
|
"metrics" : [
|
||||||
|
{
|
||||||
|
"name": "conferences",
|
||||||
|
"url": "https://demo.vcs.vinteo.dev/api/v1/conferences",
|
||||||
|
"measure": [
|
||||||
|
{ "id":"number", "type": "text", "addr": "data.conferences[].number" },
|
||||||
|
{ "id":"total", "type": "integer", "addr": "data.total" },
|
||||||
|
{ "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" },
|
||||||
|
{ "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" },
|
||||||
|
{ "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" }
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "abonents",
|
||||||
|
"url": "https://demo.vcs.vinteo.dev/api/v1/accounts",
|
||||||
|
"measure": [
|
||||||
|
{ "id":"total", "type": "integer", "addr": "data.total" }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue