Merge pull request 'feature/1117' (#8) from feature/1117 into rc
test-org/integration-module/pipeline/pr-master Build started... Details
test-org/integration-module/pipeline/pr-rc This commit looks good Details

Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/8
pull/9/head v1.0.1
Vladislav Drozdov 2025-03-04 15:49:22 +03:00
commit 8e32de3be3
13 changed files with 205 additions and 291 deletions

View File

@ -1,7 +1,7 @@
[workspace] [workspace]
resolver = "2" resolver = "2"
members = [ members = [
"crates/api-grub", "crates/integr-structs", "crates/preproc", "crates/api-grub", "crates/integr-structs",
] ]
[profile.dev] [profile.dev]

View File

@ -1,15 +1,47 @@
# Интеграционный модуль для проекта "Буревестник ВКС" # Интеграционный модуль для проекта "Буревестник ВКС"
## Описание
`integr_mod` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для: `integr_mod` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
- получение данных через API ВКС - получение данных через API ВКС
- поддержку хранения, валидации и актуализации собственных конфигураций - поддержку хранения, валидации и актуализации собственных конфигураций
- предобработку полученных данных и ~~отправку `Системе Мониторинга`~~ сохранение в БД - предобработку полученных данных и ~~отправку `Системе Мониторинга`~~ сохранение в БД
## Current Progress ## Руководство
| Crate (submodule) | Progress | 1. Заполнить .env файл или установить переменные окружения в соотвествии с примером в `.env.example` файле
``` toml
# Template .env for API grabber
# Prometheus-Exporter info
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
# eNODE.Monitoring configuration
ENODE_MONITORING_IP = "ip.ip.ip.ip"
# admin user is required
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"
# admin password is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
```
2. Произвести сборку проекта командой :
``` bash
cargo build --release
```
3. Запустить
> Debug версия
``` bash
cargo run --bin api-grub
```
или
> Release версия
``` bash
cargo run --release --bin api-grub
```
## Текущий прогресс
| Крейт (подмодуль) | Прогресс |
|---|---| |---|---|
|`api-grub` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ | |`api-grub` | ✅✅✅✅✅✅✅✅✅🛠️ |
|`config-delivery` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ | |`config-delivery [migrated]` | ❌❌❌❌❌❌❌❌❌❌ |
|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ | |`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ |
|`preproc` [temp-deprecated] | ✅✅✅❌❌❌❌❌❌❌ | (разработка временно остановлена) |`preproc` [temp-deprecated] | ❌❌❌❌❌❌❌❌❌❌ | (разработка временно остановлена)

View File

@ -1,6 +1,6 @@
[package] [package]
name = "api-grub" name = "api-grub"
version = "0.3.13" version = "1.0.1"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -1,7 +1,6 @@
// mod to communicate with api-grub config file // mod to communicate with api-grub config file
// 1) check changes in unix-socket // 1) check changes in unix-socket
// 2) save changes in local config file // 2) save changes in local config file
use integr_structs::api::ApiConfigV2;
use anyhow::{Error, Ok, Result}; use anyhow::{Error, Ok, Result};
use log::{info, warn, error}; use log::{info, warn, error};
use std::{fs, path::Path}; use std::{fs, path::Path};
@ -15,19 +14,9 @@ use integr_structs::api::v3::Config;
const CONFIG_PATH: &str = "config_api.json"; const CONFIG_PATH: &str = "config_api.json";
const SOCKET_PATH: &str = "api-grub.sock"; const SOCKET_PATH: &str = "api-grub.sock";
// todo! rewrite to use current_exe // TODO: rewrite to use current_exe
pub async fn pull_local_config() -> Result<Config> { pub async fn pull_local_config() -> Result<Config> {
// let conf_path = std::env::current_exe()?;
let path = Path::new(CONFIG_PATH); let path = Path::new(CONFIG_PATH);
// return match conf_path.parent() {
// Some(dir) => {
// let config: ApiConfig = from_str(
// &fs::read_to_string(dir.join(CONFIG_PATH))?
// )?;
// Ok(config)
// },
// None => Err(Error::msg("No local conf was found"))
// }
if path.exists() && path.is_file() { if path.exists() && path.is_file() {
let config: Config = from_str( let config: Config = from_str(
&fs::read_to_string(CONFIG_PATH)? &fs::read_to_string(CONFIG_PATH)?
@ -39,14 +28,13 @@ pub async fn pull_local_config() -> Result<Config> {
} }
// for config pulling // for config pulling
// ++++ reader to channel
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> { pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
info!("Initializing Unix-Socket listening for pulling new configs..."); info!("Initializing Unix-Socket listening for pulling new configs...");
let server = init_unix_listener().await?; let server = init_unix_listener().await?;
//
info!("Listening Unix-Socket..."); info!("Listening Unix-Socket...");
let mut buffer = String::new(); let mut buffer = String::new();
//
loop { loop {
if let stdOk((mut stream, _)) = server.accept().await { if let stdOk((mut stream, _)) = server.accept().await {
if let Err(er) = stream.read_to_string(&mut buffer).await { if let Err(er) = stream.read_to_string(&mut buffer).await {

View File

@ -1,9 +1,9 @@
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
use integr_structs::api::v3::PrometheusMetrics; use integr_structs::api::v3::{PrometheusMetrics, PrometheusMetricsExtended};
use reqwest::Client; use reqwest::Client;
use tokio_postgres::NoTls; use tokio_postgres::NoTls;
use std::env; use std::env;
use anyhow::{Result}; use anyhow::Result;
use log::{info, error}; use log::{info, error};
pub struct Exporter { pub struct Exporter {
@ -34,36 +34,43 @@ impl Exporter {
}, },
} }
} }
#[allow(unused)]
pub fn is_no_connection(&self) -> bool { self.pool.is_none() } pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
pub fn init() -> Self { pub fn init() -> Self {
Self { Self {
pool : Self::pool_construct(), pool : Self::pool_construct(),
} }
} }
#[allow(unused)]
pub async fn get_connection_from_pool(&self) -> Option<PgClient> { pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
if let Some(pool) = &self.pool { if let Some(pool) = &self.pool {
return Some(pool.get().await.ok()?); return Some(pool.get().await.ok()?);
} }
None None
} }
#[allow(unused)]
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
// client.
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?; let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?; let _ = client.query(&query, &[&metrics]).await?;
Ok(()) Ok(())
} }
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> { pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
let url = env::var("EXPORTER_URL")?; let url = env::var("EXPORTER_URL")?;
// let req = Request::new(Method::PUT,
// Url::parse(metrics)?); let req = Client::new()
// dbg!(&metrics); .post(url)
.json(&metrics)
.send().await;
req?;
Ok(metrics.get_bytes_len())
}
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
let url = env::var("EXPORTER_URL")?;
let req = Client::new() let req = Client::new()
.post(url) .post(url)
.json(&metrics) .json(&metrics)
.send().await; .send().await;
// dbg!(&req);
// dbg!(&req.unwrap().text().await);
// todo : rewrite with status code wrapping
req?; req?;
Ok(metrics.get_bytes_len()) Ok(metrics.get_bytes_len())
} }

View File

@ -4,34 +4,30 @@ use serde_json::{Map, Value};
use reqwest::Client; use reqwest::Client;
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use std::sync::Arc; use std::sync::Arc;
// use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl}; use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size};
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size};
// use crate::structs::cmdb::Query;
use integr_structs::api::enode_monitoring::cmdb::Query; use integr_structs::api::enode_monitoring::cmdb::Query;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
// use crate::structs::get_chunk_size;
use std::pin::Pin; use std::pin::Pin;
use std::future::Future; use std::future::Future;
use integr_structs::api::v3::{MetricOutput, PrometheusMetrics}; use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
use log::{error, info, warn}; use log::{error, info, warn};
// use chrono::{Local, DateTime}; use std::collections::HashMap;
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> { pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
let timer = tokio::time::Instant::now(); let timer = tokio::time::Instant::now();
'outer: loop { 'outer: loop {
let mut a = MonitoringImporter::new().await; let mut a = MonitoringImporter::new().await;
a.start_session().await?; a.start_session().await?;
let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
'inner: loop { 'inner: loop {
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) { if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
break 'outer; break 'outer;
} }
let vec = a.get_metrics_list().await.unwrap_or_else(|_| vec![]); if let Err(_) = a.get_measure_info(vec.clone()).await {
if vec.is_empty() {
warn!("Session dropped, creating new ..."); warn!("Session dropped, creating new ...");
break 'inner; break 'inner;
} }
let _ = a.get_measure_info(Arc::new(vec)).await;
// a.close_session().await?;
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
} }
} }
@ -70,36 +66,24 @@ impl MonitoringImporter {
let client = Client::new(); let client = Client::new();
let url = format!("http://{}/e-data-front/auth/login", self.ip); let url = format!("http://{}/e-data-front/auth/login", self.ip);
let fortoken = ForTokenCredentials::new(&self.login, &self.password); let fortoken = ForTokenCredentials::new(&self.login, &self.password);
// dbg!(&fortoken);
let client = client let client = client
.post(url) .post(url)
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
.json(&fortoken); .json(&fortoken);
let resp = client.send().await?; let resp = client.send().await?;
let auth = resp.json::<AuthResponse>().await?; let auth = resp.json::<AuthResponse>().await?;
// dbg!(&auth);
self.set_ts(&fortoken.ts).await; self.set_ts(&fortoken.ts).await;
self.access_token = auth.access_token.to_owned(); self.access_token = auth.access_token.to_owned();
Ok(()) Ok(())
} }
pub async fn close_session(&mut self) -> anyhow::Result<()> {
let client = Client::new();
let url = format!("http://{}/e-data-front/auth/logout", self.ip);
let client = client
.post(url)
.header("Content-Type", "application/json")
.header("access-token", &self.access_token);
let _ = client.send().await?; pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
self.access_token.clear();
Ok(())
}
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<String>> {
let client = Client::new(); let client = Client::new();
let mut vec: Vec<String> = Vec::new(); let mut vec: Vec<(String, String)> = Vec::new();
let url = format!("http://{}/e-cmdb/api/query", self.ip); let url = format!("http://{}/e-cmdb/api/query", self.ip);
let client = client let client = client
.post(url) .post(url)
@ -107,30 +91,30 @@ impl MonitoringImporter {
.header("access-token", &self.access_token) .header("access-token", &self.access_token)
.json(&Query::default()); .json(&Query::default());
let resp = client.send().await?.text().await?; let resp = client.send().await?.text().await?;
// dbg!(&resp.text().await);
let resp: Value = serde_json::from_str(&resp)?; let resp: Value = serde_json::from_str(&resp)?;
if let Some(arr) = resp.as_array() { if let Some(arr) = resp.as_array() {
for measure in arr { for measure in arr {
let id = measure.get("id"); let id = measure.get("id");
let cls = measure.get("cls"); let cls = measure.get("cls");
let name = measure.get("name");
if id.is_some() && cls.is_some() { if id.is_some() && cls.is_some() {
// todo: later wait for Vaitaliy call of classification // todo: later wait for Vaitaliy call of classification
let id = id.unwrap().as_i64().unwrap_or_default(); let id = id.unwrap().as_i64().unwrap_or_default();
let cls = cls.unwrap().as_str().unwrap_or_else(|| ""); let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
if cls.is_empty() { if cls.is_empty() {
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)")); return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
} }
// let measure_name = format!("{}${}", cls, id); vec.push((format!("{}${}", cls, id), name.to_string()));
vec.push(format!("{}${}", cls, id));
} }
} }
// dbg!(vec);
} else { } else {
return Err(Error::msg("Invalid JSON in response")); return Err(Error::msg("Invalid JSON in response"));
} }
info!("List of measures was pulled, total - {}", &vec.len());
Ok(vec) Ok(vec)
} }
pub async fn get_measure_info(&self, measures: Arc<Vec<String>>) -> anyhow::Result<()> { pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
let mut sys = sysinfo::System::new(); let mut sys = sysinfo::System::new();
sys.refresh_cpu_all(); sys.refresh_cpu_all();
// adaptive permition on task spawm to prevent system overload // adaptive permition on task spawm to prevent system overload
@ -139,43 +123,40 @@ impl MonitoringImporter {
let client = Arc::new(Client::new()); let client = Arc::new(Client::new());
let measures = measures.clone(); let measures = measures.clone();
let arc = Arc::new(self.clone()); let arc = Arc::new(self.clone());
// dbg!(&measures.display()); let chunk_size = get_chunk_size(measures.len());
info!("List of measures was divided by chunks with len {}, preparing for {} requests ...", chunk_size, measures.len() / chunk_size);
// dbg!(&measures.len()); for measure in measures.chunks(chunk_size) {
for measure in measures.chunks(get_chunk_size(measures.len())) {
let permit = sem.clone(); let permit = sem.clone();
let arc = arc.clone(); let arc = arc.clone();
let client = client.clone(); let client = client.clone();
let hm = measure.lazy_unzip();
let measure = Arc::new(measure.display()); let measure = Arc::new(measure.display());
let _permit = permit.acquire().await.unwrap(); let _permit = permit.acquire().await.unwrap();
let jh: JoinHandle<anyhow::Result<PrometheusMetrics>> = tokio::spawn(async move { let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await Self::process_endpoint(measure.clone(), client.clone(), arc.clone(), &hm).await
}); });
jh_vec.push(jh); jh_vec.push(jh);
} }
// let mut vals = Vec::new();
for event in jh_vec { for event in jh_vec {
match event.await { match event.await {
Ok(val) => { Ok(val) => {
if let Ok(val) = val { if let Ok(val) = val {
match crate::export::Exporter::export_metrics(val).await { match crate::export::Exporter::export_extended_metrics(val).await {
Ok(bytes) => info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes), Ok(bytes) => {info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes)},
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er), Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
} }
// vals.push(val);
} }
}, },
Err(er) => println!("Fatal error on async task: {}", er), Err(er) => println!("Fatal error on async task: {}", er),
} }
} }
// dbg!(&vals);
// dbg!(&vals.len());
Ok(()) Ok(())
} }
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>) -> anyhow::Result<PrometheusMetrics> { async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
let resp = client let resp = client
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure)) .get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
@ -185,23 +166,20 @@ impl MonitoringImporter {
tokio::task::yield_now().await; tokio::task::yield_now().await;
let resp: Value = serde_json::from_str(&resp)?; let resp: Value = serde_json::from_str(&resp)?;
// let a = Self::extract_metric_data(resp);
Ok( Ok(
PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await
) )
} }
fn extract_metric_data(json: Value) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutput>>> + Send>> { fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
Box::pin(async move { Box::pin(async move {
return match json { return match json {
Value::Object(obj) => { Value::Object(obj) => {
// let resp: Value = serde_json::from_str(&obj)?; return Ok(vec![Self::process_value(&obj, hm).await?])
return Ok(vec![Self::process_value(&obj).await?])
}, },
Value::Array(arr) => { Value::Array(arr) => {
let mut vec = Vec::new(); let mut vec = Vec::new();
for obj in arr { for obj in arr {
if let Ok(mut val) = Self::extract_metric_data(obj).await { if let Ok(mut val) = Self::extract_metric_data(obj, hm).await {
// vec.push(val); // vec.push(val);
vec.append(&mut val); vec.append(&mut val);
} }
@ -212,43 +190,50 @@ impl MonitoringImporter {
} }
}) })
} }
async fn process_value(obj : &Map<String, Value>) -> anyhow::Result<MetricOutput> { async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
let id = obj.get("id"); let id = obj.get("$id");
let val = obj.get("value"); let val = obj.get("value");
let description = {
if id.is_none() || val.is_none() { let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null);
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) let zero = String::new();
if dola_ip.is_null() {
zero
} else {
hm.get(
dola_ip.as_str().unwrap_or_else(|| "")
)
.unwrap_or_else(|| &zero)
.to_owned()
} }
let id = id.unwrap().as_str().unwrap_or_else(|| ""); };
let val = val.unwrap();
if id.is_empty() { if id.is_none() || val.is_none() {
return Err(Error::msg("Empty `id` field. Invalid JSON response")) return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
} }
// pub struct MetricOutput { let id = id.unwrap().as_str().unwrap_or_else(|| "").replace("$", "_");
// pub id : String, let val = val.unwrap();
// #[serde(rename = "type")]
// json_type : String,
// addr : String,
// pub value : Value,
// }
Ok(MetricOutput { if id.is_empty() {
id : id.to_owned(), return Err(Error::msg("Empty `id` field. Invalid JSON response"))
json_type : match val { }
Value::Number(val) => {
if val.is_i64() { Ok(MetricOutputExtended {
"i64".to_owned() id : id.to_owned(),
} else if val.is_u64() { json_type : match val {
"u64".to_owned() Value::Number(val) => {
} else { if val.is_i64() {
"f64".to_owned() "i64".to_owned()
} } else if val.is_u64() {
}, "u64".to_owned()
_ => "unknown".to_owned(), } else {
"f64".to_owned()
}
}, },
addr : "enode.monitoring.api".to_owned(), _ => "unknown".to_owned(),
value : val.clone() },
}) addr : "enode.monitoring.api".to_owned(),
desc : description,
value : val.clone()
})
} }
} }

View File

@ -1,22 +1,18 @@
// module to handle unix-socket connection + pulling info from api // module to handle unix-socket connection + pulling info from api
use anyhow::Result; use anyhow::Result;
// use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
use log::{error, info}; use log::{error, info};
use rand::random; use rand::random;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use reqwest::{Client, Method}; use reqwest::Client;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::sync::Arc; use std::sync::Arc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
// use tokio::sync::Mutex;
use dotenv::dotenv; use dotenv::dotenv;
use crate::json::JsonParser; use crate::json::JsonParser;
use crate::export::Exporter; use crate::export::Exporter;
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics}; use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
// use md5::compute;
// type BufferType = Arc<Mutex<Vec<String>>>;
// for api info pulling // for api info pulling
pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> { pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> {
@ -42,32 +38,13 @@ pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>)
} }
let shared_pool = shared_pool.clone(); let shared_pool = shared_pool.clone();
info!("Data from API: {:?}", poller.process_polling(shared_pool).await); info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
// sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
}
}
// Ok(())
}
struct RestMethod;
impl RestMethod {
pub async fn from_str(method: &str) -> Method {
return match method.trim().to_lowercase().as_str() {
"post" => Method::POST,
"patch" => Method::PATCH,
"put" => Method::PUT,
"delete" => Method::DELETE,
"head" => Method::HEAD,
"trace" => Method::TRACE,
"options" => Method::OPTIONS,
"connect" => Method::CONNECT,
"get" | _ => Method::GET
} }
} }
} }
struct ApiPoll<'a> { struct ApiPoll<'a> {
config : &'a mut Config, config : &'a mut Config,
#[allow(unused)]
client : Client, client : Client,
} }
@ -78,26 +55,18 @@ impl<'a> ApiPoll<'a> {
client : Client::new(), client : Client::new(),
} }
} }
// can be weak and with bug test needed
pub async fn change_config(&mut self, conf: Config) { pub async fn change_config(&mut self, conf: Config) {
*self.config = conf; *self.config = conf;
} }
pub async fn is_default(&self) -> bool { pub async fn is_default(&self) -> bool {
self.config.is_default().await self.config.is_default().await
} }
// pub async fn get_delay(&self) -> u32 {
// self.config.timeout
// }
pub async fn process_metrics( pub async fn process_metrics(
service_id: Arc<String>, service_id: Arc<String>,
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
creds: Credentials, creds: Credentials,
// exporter: Arc<Exporter>
) -> Result<()> { ) -> Result<()> {
// processing metrics // processing metrics
// let mut req = Client::new()
// // .user_agent("api_grub/integration_module")
// .get(&metrics.url);
use std::hash::DefaultHasher; use std::hash::DefaultHasher;
let rand = random::<char>(); let rand = random::<char>();
@ -113,32 +82,15 @@ impl<'a> ApiPoll<'a> {
let api_key = &creds.endpoint.api_key; let api_key = &creds.endpoint.api_key;
if !login.is_empty() && !password.is_empty() { if !login.is_empty() && !password.is_empty() {
// dbg!("kjgbkasgksjd");
req = req.basic_auth(login, Some(password)); req = req.basic_auth(login, Some(password));
} }
if !api_key.is_empty() { if !api_key.is_empty() {
// req = req.bearer_auth(&api_key);
// req = req.header("authorization", "bearer ");
req = req.header("accept", "application/json"); req = req.header("accept", "application/json");
req = req.header("x-api-key", api_key); req = req.header("x-api-key", api_key);
// req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"])
} }
// dbg!(&req);
// let (client, res) = req.build_split();
// let res = res.unwrap();
// res.url_mut().is_special()
// dbg!(client);
// dbg!(res);
// todo!();
match req.send().await { match req.send().await {
Ok(resp) => { Ok(resp) => {
// dbg!(&resp.text().await);
if let Ok(response) = resp.text().await { if let Ok(response) = resp.text().await {
match serde_json::to_value(&response) { match serde_json::to_value(&response) {
Err(er) => { Err(er) => {
@ -147,7 +99,6 @@ impl<'a> ApiPoll<'a> {
Ok(_) => { Ok(_) => {
let endpoint_name = &metrics.name; let endpoint_name = &metrics.name;
let preproc = JsonParser::parse(&metrics.measure, &response); let preproc = JsonParser::parse(&metrics.measure, &response);
// dbg!(serde_json::to_string_pretty(&preproc));
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc); let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
match Exporter::export_metrics(preproc).await { match Exporter::export_metrics(preproc).await {
Ok(bytes) => { Ok(bytes) => {
@ -170,18 +121,15 @@ impl<'a> ApiPoll<'a> {
Ok(()) Ok(())
} }
pub async fn process_endpoint( pub async fn process_endpoint(
// client : Arc<Client>,
config : Arc<ConfigEndpoint>, config : Arc<ConfigEndpoint>,
creds : Credentials, creds : Credentials,
// exporter : Arc<Exporter>
) -> Result<()> { ) -> Result<()> {
// // TODO: HAVE TO BE USED
let period = config.get_period().unwrap_or(0); let _period = config.get_period().unwrap_or(0);
let timeout = config.get_timeout().unwrap_or(5); let timeout = config.get_timeout().unwrap_or(5);
let metrics = Arc::new(config.metrics.clone()); let metrics = Arc::new(config.metrics.clone());
let service_id = Arc::new(config.id.clone()); let service_id = Arc::new(config.id.clone());
loop { loop {
// let exporter = exporter.clone();
let creds = creds.clone(); let creds = creds.clone();
let metrics = metrics.clone(); let metrics = metrics.clone();
let service_id = service_id.clone(); let service_id = service_id.clone();
@ -211,28 +159,23 @@ impl<'a> ApiPoll<'a> {
// processing // processing
sleep(Duration::from_secs(timeout)).await sleep(Duration::from_secs(timeout)).await
} }
Ok(())
} }
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> { pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
// let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
// let client = Arc::new(self.client.clone());
let config = Arc::new(self.config.clone()); let config = Arc::new(self.config.clone());
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone()); let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![]; let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
for (idx, _) in config.config.iter().enumerate() { for (idx, _) in config.config.iter().enumerate() {
// let for_creds = endpoints[idx].clone();
let creds = Credentials::from_config_endpoint(endpoints[idx].clone()); let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
let endpoint = endpoints[idx].clone(); let endpoint = endpoints[idx].clone();
// let client = client.clone();
// TODO: USE EXPORTER
#[allow(unused)]
let exporter = exporter.clone(); let exporter = exporter.clone();
let join_handler = tokio::spawn(async move { let join_handler = tokio::spawn(async move {
Self::process_endpoint( Self::process_endpoint(
// client,
endpoint, endpoint,
creds, creds,
// exporter.clone()
).await ).await
}); });
join_handles.push(join_handler); join_handles.push(join_handler);
@ -245,6 +188,7 @@ impl<'a> ApiPoll<'a> {
} }
} }
// TODO: FIX TESTS
// #[cfg(test)] // #[cfg(test)]
// mod net_unittests { // mod net_unittests {
// use super::*; // use super::*;

View File

@ -243,7 +243,7 @@ pub mod v3 {
pub value : Value, pub value : Value,
} }
impl MetricOutput { impl MetricOutput {
pub fn new_with_slices(id : &str, json_type : &str, addr: &str,value : Value) -> Self { pub fn new_with_slices(id : &str, json_type : &str, addr: &str, value : Value) -> Self {
MetricOutput { MetricOutput {
id : id.to_string(), id : id.to_string(),
json_type : json_type.to_string(), json_type : json_type.to_string(),
@ -252,6 +252,27 @@ pub mod v3 {
} }
} }
} }
#[derive(Serialize, Deserialize, Debug)]
pub struct MetricOutputExtended {
pub id : String,
#[serde(rename = "type")]
pub json_type : String,
pub addr : String,
pub value : Value,
#[serde(rename = "description")]
pub desc : String,
}
impl MetricOutputExtended {
pub fn new_with_slices(id : &str, json_type : &str, addr: &str, desc : &str, value : Value) -> Self {
MetricOutputExtended {
id : id.to_string(),
json_type : json_type.to_string(),
addr : addr.to_string(),
value : value,
desc : desc.to_string(),
}
}
}
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct PrometheusMetrics { pub struct PrometheusMetrics {
@ -281,10 +302,33 @@ pub mod v3 {
str_metrics.len() str_metrics.len()
} }
} }
#[derive(Serialize, Deserialize, Debug)]
pub struct PrometheusMetricsExtended {
pub service_name: String,
pub endpoint_name: String,
pub metrics: Vec<MetricOutputExtended>,
}
impl PrometheusMetricsExtended {
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
Self {
service_name : "zvks".to_owned(),
endpoint_name : "apiforsnmp".to_owned(),
metrics : metrics,
}
}
pub fn get_bytes_len(&self) -> usize {
let str_metrics = serde_json::to_vec(self).unwrap_or_else(
|_| Vec::new()
);
str_metrics.len()
}
}
} }
pub mod enode_monitoring { pub mod enode_monitoring {
use std::hash::Hash;
use super::*; use super::*;
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@ -383,7 +427,14 @@ pub mod enode_monitoring {
fn display(&self) -> String; fn display(&self) -> String;
} }
impl<T> GenericUrl for [T] pub trait LazyUnzip<K, V>
where
V : Clone,
K : Hash + Eq + Clone {
fn lazy_unzip(&self) -> HashMap<K, V>;
}
impl<T> GenericUrl for [(T, T)]
where T : Display { where T : Display {
fn display(&self) -> String { fn display(&self) -> String {
let mut vec: Vec<String> = Vec::new(); let mut vec: Vec<String> = Vec::new();
@ -394,12 +445,23 @@ pub mod enode_monitoring {
if id > 0 { if id > 0 {
vec.push(",".to_owned()); vec.push(",".to_owned());
} }
vec.push(format!("%22{}%22", val)); vec.push(format!("%22{}%22", val.0));
}); });
vec.push("%5D".to_owned()); vec.push("%5D".to_owned());
vec.concat() vec.concat()
} }
} }
impl<K, V> LazyUnzip<K, V> for [(K, V)]
where
V : Clone,
K : Hash + Eq + Clone {
fn lazy_unzip(&self) -> HashMap<K, V> {
let mut hm = HashMap::new();
self.into_iter()
.for_each(|(key, val)| {hm.insert(key.to_owned(), val.to_owned());});
hm
}
}
pub fn get_chunk_size(total_measures: usize) -> usize { pub fn get_chunk_size(total_measures: usize) -> usize {
match total_measures { match total_measures {

View File

@ -1,14 +0,0 @@
[package]
name = "preproc"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.95"
chrono = "0.4.39"
dotenv = "0.15.0"
env_logger = "0.11.6"
log = "0.4.25"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.137"
tokio = { version = "1.43.0", features = ["full"] }

View File

@ -1,21 +0,0 @@
// mod for prpeproc config pulling and updating
#[cfg(test)]
mod config_unittests {
use tokio::test;
#[test]
async fn create_unix_socket_server() { assert!(true) }
#[test]
async fn verify_on_valid_config() { assert!(true) }
#[test]
async fn verify_on_invalid_config() { assert!(true) }
}

View File

@ -1,50 +0,0 @@
use chrono::Local;
use env_logger::Builder;
use log::LevelFilter;
use std::io::Write;
use anyhow::Result;
use log::info;
pub async fn setup_logger() -> Result<()> {
Builder::new()
.format(move |buf, record| {
writeln!(
buf,
"|{}| {} [{}] - {}",
"config-delivery",
Local::now().format("%d-%m-%Y %H:%M:%S"),
record.level(),
record.args(),
)
})
.filter(None, LevelFilter::Info)
.target(env_logger::Target::Stdout)
.init();
info!("Logger configured");
Ok(())
}
#[cfg(test)]
mod logger_unittests {
use tokio::test;
use super::*;
#[test]
async fn check_logger_builder() {
Builder::new()
.format(move |buf, record| {
writeln!(
buf,
"|{}| {} [{}] - {}",
"config-delivery",
Local::now().format("%d-%m-%Y %H:%M:%S"),
record.level(),
record.args(),
)
})
.filter(None, LevelFilter::Info)
.target(env_logger::Target::Stdout)
.init();
}
}

View File

@ -1,18 +0,0 @@
mod config;
mod transform;
mod logger;
use logger::setup_logger;
use dotenv::dotenv;
use anyhow::Result;
use log::info;
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()>{
let _ = setup_logger().await?;
info!("Pulling env vars from .env file if exists ...");
dotenv().ok();
Ok(())
}

View File

@ -1 +0,0 @@
// mod for preproccessing and transfering to the CM metrics data