Compare commits
No commits in common. "68bee74756b723e3f9f82a8768fd49832915e8d2" and "c6e24cae4256b468f416781c9df8bdad9fc0f9fe" have entirely different histories.
68bee74756
...
c6e24cae42
|
|
@ -1,7 +1,7 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
members = [
|
members = [
|
||||||
"crates/api-grub", "crates/integr-structs",
|
"crates/api-grub", "crates/integr-structs", "crates/preproc",
|
||||||
]
|
]
|
||||||
|
|
||||||
[profile.dev]
|
[profile.dev]
|
||||||
|
|
|
||||||
42
README.md
42
README.md
|
|
@ -1,47 +1,15 @@
|
||||||
# Интеграционный модуль для проекта "Буревестник ВКС"
|
# Интеграционный модуль для проекта "Буревестник ВКС"
|
||||||
|
|
||||||
## Описание
|
|
||||||
`integr_mod` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
|
`integr_mod` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для:
|
||||||
- получение данных через API ВКС
|
- получение данных через API ВКС
|
||||||
- поддержку хранения, валидации и актуализации собственных конфигураций
|
- поддержку хранения, валидации и актуализации собственных конфигураций
|
||||||
- предобработку полученных данных и ~~отправку `Системе Мониторинга`~~ сохранение в БД
|
- предобработку полученных данных и ~~отправку `Системе Мониторинга`~~ сохранение в БД
|
||||||
|
|
||||||
## Руководство
|
## Current Progress
|
||||||
|
|
||||||
1. Заполнить .env файл или установить переменные окружения в соотвествии с примером в `.env.example` файле
|
| Crate (submodule) | Progress |
|
||||||
``` toml
|
|
||||||
# Template .env for API grabber
|
|
||||||
|
|
||||||
# Prometheus-Exporter info
|
|
||||||
EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
|
|
||||||
|
|
||||||
# eNODE.Monitoring configuration
|
|
||||||
ENODE_MONITORING_IP = "ip.ip.ip.ip"
|
|
||||||
# admin user is required
|
|
||||||
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring"
|
|
||||||
# admin password is required
|
|
||||||
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring"
|
|
||||||
```
|
|
||||||
2. Произвести сборку проекта командой :
|
|
||||||
``` bash
|
|
||||||
cargo build --release
|
|
||||||
```
|
|
||||||
|
|
||||||
3. Запустить
|
|
||||||
> Debug версия
|
|
||||||
``` bash
|
|
||||||
cargo run --bin api-grub
|
|
||||||
```
|
|
||||||
или
|
|
||||||
> Release версия
|
|
||||||
``` bash
|
|
||||||
cargo run --release --bin api-grub
|
|
||||||
```
|
|
||||||
## Текущий прогресс
|
|
||||||
|
|
||||||
| Крейт (подмодуль) | Прогресс |
|
|
||||||
|---|---|
|
|---|---|
|
||||||
|`api-grub` | ✅✅✅✅✅✅✅✅✅🛠️ |
|
|`api-grub` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ |
|
||||||
|`config-delivery [migrated]` | ❌❌❌❌❌❌❌❌❌❌ |
|
|`config-delivery` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ |
|
||||||
|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ |
|
|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ |
|
||||||
|`preproc` [temp-deprecated] | ❌❌❌❌❌❌❌❌❌❌ | (разработка временно остановлена)
|
|`preproc` [temp-deprecated] | ✅✅✅❌❌❌❌❌❌❌ | (разработка временно остановлена)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "api-grub"
|
name = "api-grub"
|
||||||
version = "1.0.1"
|
version = "0.3.13"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
// mod to communicate with api-grub config file
|
// mod to communicate with api-grub config file
|
||||||
// 1) check changes in unix-socket
|
// 1) check changes in unix-socket
|
||||||
// 2) save changes in local config file
|
// 2) save changes in local config file
|
||||||
|
use integr_structs::api::ApiConfigV2;
|
||||||
use anyhow::{Error, Ok, Result};
|
use anyhow::{Error, Ok, Result};
|
||||||
use log::{info, warn, error};
|
use log::{info, warn, error};
|
||||||
use std::{fs, path::Path};
|
use std::{fs, path::Path};
|
||||||
|
|
@ -14,9 +15,19 @@ use integr_structs::api::v3::Config;
|
||||||
const CONFIG_PATH: &str = "config_api.json";
|
const CONFIG_PATH: &str = "config_api.json";
|
||||||
const SOCKET_PATH: &str = "api-grub.sock";
|
const SOCKET_PATH: &str = "api-grub.sock";
|
||||||
|
|
||||||
// TODO: rewrite to use current_exe
|
// todo! rewrite to use current_exe
|
||||||
pub async fn pull_local_config() -> Result<Config> {
|
pub async fn pull_local_config() -> Result<Config> {
|
||||||
|
// let conf_path = std::env::current_exe()?;
|
||||||
let path = Path::new(CONFIG_PATH);
|
let path = Path::new(CONFIG_PATH);
|
||||||
|
// return match conf_path.parent() {
|
||||||
|
// Some(dir) => {
|
||||||
|
// let config: ApiConfig = from_str(
|
||||||
|
// &fs::read_to_string(dir.join(CONFIG_PATH))?
|
||||||
|
// )?;
|
||||||
|
// Ok(config)
|
||||||
|
// },
|
||||||
|
// None => Err(Error::msg("No local conf was found"))
|
||||||
|
// }
|
||||||
if path.exists() && path.is_file() {
|
if path.exists() && path.is_file() {
|
||||||
let config: Config = from_str(
|
let config: Config = from_str(
|
||||||
&fs::read_to_string(CONFIG_PATH)?
|
&fs::read_to_string(CONFIG_PATH)?
|
||||||
|
|
@ -28,13 +39,14 @@ pub async fn pull_local_config() -> Result<Config> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// for config pulling
|
// for config pulling
|
||||||
|
// ++++ reader to channel
|
||||||
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
|
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
|
||||||
info!("Initializing Unix-Socket listening for pulling new configs...");
|
info!("Initializing Unix-Socket listening for pulling new configs...");
|
||||||
let server = init_unix_listener().await?;
|
let server = init_unix_listener().await?;
|
||||||
|
//
|
||||||
info!("Listening Unix-Socket...");
|
info!("Listening Unix-Socket...");
|
||||||
let mut buffer = String::new();
|
let mut buffer = String::new();
|
||||||
|
//
|
||||||
loop {
|
loop {
|
||||||
if let stdOk((mut stream, _)) = server.accept().await {
|
if let stdOk((mut stream, _)) = server.accept().await {
|
||||||
if let Err(er) = stream.read_to_string(&mut buffer).await {
|
if let Err(er) = stream.read_to_string(&mut buffer).await {
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,9 @@
|
||||||
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
|
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
|
||||||
use integr_structs::api::v3::{PrometheusMetrics, PrometheusMetricsExtended};
|
use integr_structs::api::v3::PrometheusMetrics;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use tokio_postgres::NoTls;
|
use tokio_postgres::NoTls;
|
||||||
use std::env;
|
use std::env;
|
||||||
use anyhow::Result;
|
use anyhow::{Result};
|
||||||
use log::{info, error};
|
use log::{info, error};
|
||||||
|
|
||||||
pub struct Exporter {
|
pub struct Exporter {
|
||||||
|
|
@ -34,43 +34,36 @@ impl Exporter {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[allow(unused)]
|
|
||||||
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
||||||
pub fn init() -> Self {
|
pub fn init() -> Self {
|
||||||
Self {
|
Self {
|
||||||
pool : Self::pool_construct(),
|
pool : Self::pool_construct(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[allow(unused)]
|
|
||||||
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
||||||
if let Some(pool) = &self.pool {
|
if let Some(pool) = &self.pool {
|
||||||
return Some(pool.get().await.ok()?);
|
return Some(pool.get().await.ok()?);
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
#[allow(unused)]
|
|
||||||
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||||
|
// client.
|
||||||
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
||||||
let _ = client.query(&query, &[&metrics]).await?;
|
let _ = client.query(&query, &[&metrics]).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
||||||
let url = env::var("EXPORTER_URL")?;
|
let url = env::var("EXPORTER_URL")?;
|
||||||
|
// let req = Request::new(Method::PUT,
|
||||||
let req = Client::new()
|
// Url::parse(metrics)?);
|
||||||
.post(url)
|
// dbg!(&metrics);
|
||||||
.json(&metrics)
|
|
||||||
.send().await;
|
|
||||||
|
|
||||||
req?;
|
|
||||||
Ok(metrics.get_bytes_len())
|
|
||||||
}
|
|
||||||
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
|
|
||||||
let url = env::var("EXPORTER_URL")?;
|
|
||||||
let req = Client::new()
|
let req = Client::new()
|
||||||
.post(url)
|
.post(url)
|
||||||
.json(&metrics)
|
.json(&metrics)
|
||||||
.send().await;
|
.send().await;
|
||||||
|
// dbg!(&req);
|
||||||
|
// dbg!(&req.unwrap().text().await);
|
||||||
|
// todo : rewrite with status code wrapping
|
||||||
req?;
|
req?;
|
||||||
Ok(metrics.get_bytes_len())
|
Ok(metrics.get_bytes_len())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,30 +4,34 @@ use serde_json::{Map, Value};
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size};
|
// use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl};
|
||||||
|
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size};
|
||||||
|
// use crate::structs::cmdb::Query;
|
||||||
use integr_structs::api::enode_monitoring::cmdb::Query;
|
use integr_structs::api::enode_monitoring::cmdb::Query;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
// use crate::structs::get_chunk_size;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
|
use integr_structs::api::v3::{MetricOutput, PrometheusMetrics};
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use std::collections::HashMap;
|
// use chrono::{Local, DateTime};
|
||||||
|
|
||||||
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
|
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
|
||||||
let timer = tokio::time::Instant::now();
|
let timer = tokio::time::Instant::now();
|
||||||
'outer: loop {
|
'outer: loop {
|
||||||
let mut a = MonitoringImporter::new().await;
|
let mut a = MonitoringImporter::new().await;
|
||||||
a.start_session().await?;
|
a.start_session().await?;
|
||||||
let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
|
|
||||||
|
|
||||||
'inner: loop {
|
'inner: loop {
|
||||||
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
|
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
|
||||||
break 'outer;
|
break 'outer;
|
||||||
}
|
}
|
||||||
if let Err(_) = a.get_measure_info(vec.clone()).await {
|
let vec = a.get_metrics_list().await.unwrap_or_else(|_| vec![]);
|
||||||
|
if vec.is_empty() {
|
||||||
warn!("Session dropped, creating new ...");
|
warn!("Session dropped, creating new ...");
|
||||||
break 'inner;
|
break 'inner;
|
||||||
}
|
}
|
||||||
|
let _ = a.get_measure_info(Arc::new(vec)).await;
|
||||||
|
// a.close_session().await?;
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
|
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -66,24 +70,36 @@ impl MonitoringImporter {
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
||||||
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
||||||
|
// dbg!(&fortoken);
|
||||||
let client = client
|
let client = client
|
||||||
.post(url)
|
.post(url)
|
||||||
.header("Content-Type", "application/json")
|
.header("Content-Type", "application/json")
|
||||||
.json(&fortoken);
|
.json(&fortoken);
|
||||||
let resp = client.send().await?;
|
let resp = client.send().await?;
|
||||||
let auth = resp.json::<AuthResponse>().await?;
|
let auth = resp.json::<AuthResponse>().await?;
|
||||||
|
// dbg!(&auth);
|
||||||
self.set_ts(&fortoken.ts).await;
|
self.set_ts(&fortoken.ts).await;
|
||||||
|
|
||||||
self.access_token = auth.access_token.to_owned();
|
self.access_token = auth.access_token.to_owned();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
pub async fn close_session(&mut self) -> anyhow::Result<()> {
|
||||||
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
|
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let mut vec: Vec<(String, String)> = Vec::new();
|
let url = format!("http://{}/e-data-front/auth/logout", self.ip);
|
||||||
|
let client = client
|
||||||
|
.post(url)
|
||||||
|
.header("Content-Type", "application/json")
|
||||||
|
.header("access-token", &self.access_token);
|
||||||
|
|
||||||
|
let _ = client.send().await?;
|
||||||
|
|
||||||
|
self.access_token.clear();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<String>> {
|
||||||
|
let client = Client::new();
|
||||||
|
let mut vec: Vec<String> = Vec::new();
|
||||||
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
||||||
let client = client
|
let client = client
|
||||||
.post(url)
|
.post(url)
|
||||||
|
|
@ -91,30 +107,30 @@ impl MonitoringImporter {
|
||||||
.header("access-token", &self.access_token)
|
.header("access-token", &self.access_token)
|
||||||
.json(&Query::default());
|
.json(&Query::default());
|
||||||
let resp = client.send().await?.text().await?;
|
let resp = client.send().await?.text().await?;
|
||||||
|
// dbg!(&resp.text().await);
|
||||||
let resp: Value = serde_json::from_str(&resp)?;
|
let resp: Value = serde_json::from_str(&resp)?;
|
||||||
if let Some(arr) = resp.as_array() {
|
if let Some(arr) = resp.as_array() {
|
||||||
for measure in arr {
|
for measure in arr {
|
||||||
let id = measure.get("id");
|
let id = measure.get("id");
|
||||||
let cls = measure.get("cls");
|
let cls = measure.get("cls");
|
||||||
let name = measure.get("name");
|
|
||||||
if id.is_some() && cls.is_some() {
|
if id.is_some() && cls.is_some() {
|
||||||
// todo: later wait for Vaitaliy call of classification
|
// todo: later wait for Vaitaliy call of classification
|
||||||
let id = id.unwrap().as_i64().unwrap_or_default();
|
let id = id.unwrap().as_i64().unwrap_or_default();
|
||||||
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
||||||
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
|
|
||||||
if cls.is_empty() {
|
if cls.is_empty() {
|
||||||
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
|
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
|
||||||
}
|
}
|
||||||
vec.push((format!("{}${}", cls, id), name.to_string()));
|
// let measure_name = format!("{}${}", cls, id);
|
||||||
|
vec.push(format!("{}${}", cls, id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// dbg!(vec);
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::msg("Invalid JSON in response"));
|
return Err(Error::msg("Invalid JSON in response"));
|
||||||
}
|
}
|
||||||
info!("List of measures was pulled, total - {}", &vec.len());
|
|
||||||
Ok(vec)
|
Ok(vec)
|
||||||
}
|
}
|
||||||
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
|
pub async fn get_measure_info(&self, measures: Arc<Vec<String>>) -> anyhow::Result<()> {
|
||||||
let mut sys = sysinfo::System::new();
|
let mut sys = sysinfo::System::new();
|
||||||
sys.refresh_cpu_all();
|
sys.refresh_cpu_all();
|
||||||
// adaptive permition on task spawm to prevent system overload
|
// adaptive permition on task spawm to prevent system overload
|
||||||
|
|
@ -123,40 +139,43 @@ impl MonitoringImporter {
|
||||||
let client = Arc::new(Client::new());
|
let client = Arc::new(Client::new());
|
||||||
let measures = measures.clone();
|
let measures = measures.clone();
|
||||||
let arc = Arc::new(self.clone());
|
let arc = Arc::new(self.clone());
|
||||||
let chunk_size = get_chunk_size(measures.len());
|
// dbg!(&measures.display());
|
||||||
info!("List of measures was divided by chunks with len {}, preparing for {} requests ...", chunk_size, measures.len() / chunk_size);
|
|
||||||
|
|
||||||
for measure in measures.chunks(chunk_size) {
|
// dbg!(&measures.len());
|
||||||
|
for measure in measures.chunks(get_chunk_size(measures.len())) {
|
||||||
let permit = sem.clone();
|
let permit = sem.clone();
|
||||||
let arc = arc.clone();
|
let arc = arc.clone();
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
let hm = measure.lazy_unzip();
|
|
||||||
let measure = Arc::new(measure.display());
|
let measure = Arc::new(measure.display());
|
||||||
|
|
||||||
let _permit = permit.acquire().await.unwrap();
|
let _permit = permit.acquire().await.unwrap();
|
||||||
|
|
||||||
let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
|
let jh: JoinHandle<anyhow::Result<PrometheusMetrics>> = tokio::spawn(async move {
|
||||||
Self::process_endpoint(measure.clone(), client.clone(), arc.clone(), &hm).await
|
Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await
|
||||||
|
|
||||||
});
|
});
|
||||||
jh_vec.push(jh);
|
jh_vec.push(jh);
|
||||||
}
|
}
|
||||||
|
// let mut vals = Vec::new();
|
||||||
for event in jh_vec {
|
for event in jh_vec {
|
||||||
match event.await {
|
match event.await {
|
||||||
Ok(val) => {
|
Ok(val) => {
|
||||||
if let Ok(val) = val {
|
if let Ok(val) = val {
|
||||||
match crate::export::Exporter::export_extended_metrics(val).await {
|
match crate::export::Exporter::export_metrics(val).await {
|
||||||
Ok(bytes) => {info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes)},
|
Ok(bytes) => info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes),
|
||||||
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
|
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
|
||||||
}
|
}
|
||||||
|
// vals.push(val);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(er) => println!("Fatal error on async task: {}", er),
|
Err(er) => println!("Fatal error on async task: {}", er),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// dbg!(&vals);
|
||||||
|
// dbg!(&vals.len());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
|
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>) -> anyhow::Result<PrometheusMetrics> {
|
||||||
let resp = client
|
let resp = client
|
||||||
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
||||||
.header("Content-Type", "application/json")
|
.header("Content-Type", "application/json")
|
||||||
|
|
@ -166,20 +185,23 @@ impl MonitoringImporter {
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
let resp: Value = serde_json::from_str(&resp)?;
|
let resp: Value = serde_json::from_str(&resp)?;
|
||||||
|
// let a = Self::extract_metric_data(resp);
|
||||||
|
|
||||||
Ok(
|
Ok(
|
||||||
PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await
|
PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
|
fn extract_metric_data(json: Value) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutput>>> + Send>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
return match json {
|
return match json {
|
||||||
Value::Object(obj) => {
|
Value::Object(obj) => {
|
||||||
return Ok(vec![Self::process_value(&obj, hm).await?])
|
// let resp: Value = serde_json::from_str(&obj)?;
|
||||||
|
return Ok(vec![Self::process_value(&obj).await?])
|
||||||
},
|
},
|
||||||
Value::Array(arr) => {
|
Value::Array(arr) => {
|
||||||
let mut vec = Vec::new();
|
let mut vec = Vec::new();
|
||||||
for obj in arr {
|
for obj in arr {
|
||||||
if let Ok(mut val) = Self::extract_metric_data(obj, hm).await {
|
if let Ok(mut val) = Self::extract_metric_data(obj).await {
|
||||||
// vec.push(val);
|
// vec.push(val);
|
||||||
vec.append(&mut val);
|
vec.append(&mut val);
|
||||||
}
|
}
|
||||||
|
|
@ -190,34 +212,28 @@ impl MonitoringImporter {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
|
async fn process_value(obj : &Map<String, Value>) -> anyhow::Result<MetricOutput> {
|
||||||
let id = obj.get("$id");
|
let id = obj.get("id");
|
||||||
let val = obj.get("value");
|
let val = obj.get("value");
|
||||||
let description = {
|
|
||||||
let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null);
|
|
||||||
let zero = String::new();
|
|
||||||
if dola_ip.is_null() {
|
|
||||||
zero
|
|
||||||
} else {
|
|
||||||
hm.get(
|
|
||||||
dola_ip.as_str().unwrap_or_else(|| "")
|
|
||||||
)
|
|
||||||
.unwrap_or_else(|| &zero)
|
|
||||||
.to_owned()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if id.is_none() || val.is_none() {
|
if id.is_none() || val.is_none() {
|
||||||
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
|
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
|
||||||
}
|
}
|
||||||
let id = id.unwrap().as_str().unwrap_or_else(|| "").replace("$", "_");
|
let id = id.unwrap().as_str().unwrap_or_else(|| "");
|
||||||
let val = val.unwrap();
|
let val = val.unwrap();
|
||||||
|
|
||||||
if id.is_empty() {
|
if id.is_empty() {
|
||||||
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
||||||
}
|
}
|
||||||
|
// pub struct MetricOutput {
|
||||||
|
// pub id : String,
|
||||||
|
// #[serde(rename = "type")]
|
||||||
|
// json_type : String,
|
||||||
|
// addr : String,
|
||||||
|
// pub value : Value,
|
||||||
|
// }
|
||||||
|
|
||||||
Ok(MetricOutputExtended {
|
Ok(MetricOutput {
|
||||||
id : id.to_owned(),
|
id : id.to_owned(),
|
||||||
json_type : match val {
|
json_type : match val {
|
||||||
Value::Number(val) => {
|
Value::Number(val) => {
|
||||||
|
|
@ -232,7 +248,6 @@ impl MonitoringImporter {
|
||||||
_ => "unknown".to_owned(),
|
_ => "unknown".to_owned(),
|
||||||
},
|
},
|
||||||
addr : "enode.monitoring.api".to_owned(),
|
addr : "enode.monitoring.api".to_owned(),
|
||||||
desc : description,
|
|
||||||
value : val.clone()
|
value : val.clone()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,18 +1,22 @@
|
||||||
// module to handle unix-socket connection + pulling info from api
|
// module to handle unix-socket connection + pulling info from api
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
// use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use rand::random;
|
use rand::random;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use reqwest::Client;
|
use reqwest::{Client, Method};
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
// use tokio::sync::Mutex;
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
use crate::json::JsonParser;
|
use crate::json::JsonParser;
|
||||||
use crate::export::Exporter;
|
use crate::export::Exporter;
|
||||||
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
|
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
|
||||||
|
// use md5::compute;
|
||||||
|
|
||||||
|
// type BufferType = Arc<Mutex<Vec<String>>>;
|
||||||
|
|
||||||
// for api info pulling
|
// for api info pulling
|
||||||
pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> {
|
pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> {
|
||||||
|
|
@ -38,13 +42,32 @@ pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>)
|
||||||
}
|
}
|
||||||
let shared_pool = shared_pool.clone();
|
let shared_pool = shared_pool.clone();
|
||||||
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
|
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
|
||||||
|
// sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct RestMethod;
|
||||||
|
|
||||||
|
impl RestMethod {
|
||||||
|
pub async fn from_str(method: &str) -> Method {
|
||||||
|
return match method.trim().to_lowercase().as_str() {
|
||||||
|
"post" => Method::POST,
|
||||||
|
"patch" => Method::PATCH,
|
||||||
|
"put" => Method::PUT,
|
||||||
|
"delete" => Method::DELETE,
|
||||||
|
"head" => Method::HEAD,
|
||||||
|
"trace" => Method::TRACE,
|
||||||
|
"options" => Method::OPTIONS,
|
||||||
|
"connect" => Method::CONNECT,
|
||||||
|
"get" | _ => Method::GET
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
struct ApiPoll<'a> {
|
struct ApiPoll<'a> {
|
||||||
config : &'a mut Config,
|
config : &'a mut Config,
|
||||||
#[allow(unused)]
|
|
||||||
client : Client,
|
client : Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -55,18 +78,26 @@ impl<'a> ApiPoll<'a> {
|
||||||
client : Client::new(),
|
client : Client::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// can be weak and with bug test needed
|
||||||
pub async fn change_config(&mut self, conf: Config) {
|
pub async fn change_config(&mut self, conf: Config) {
|
||||||
*self.config = conf;
|
*self.config = conf;
|
||||||
}
|
}
|
||||||
pub async fn is_default(&self) -> bool {
|
pub async fn is_default(&self) -> bool {
|
||||||
self.config.is_default().await
|
self.config.is_default().await
|
||||||
}
|
}
|
||||||
|
// pub async fn get_delay(&self) -> u32 {
|
||||||
|
// self.config.timeout
|
||||||
|
// }
|
||||||
pub async fn process_metrics(
|
pub async fn process_metrics(
|
||||||
service_id: Arc<String>,
|
service_id: Arc<String>,
|
||||||
metrics: Arc<Metrics>,
|
metrics: Arc<Metrics>,
|
||||||
creds: Credentials,
|
creds: Credentials,
|
||||||
|
// exporter: Arc<Exporter>
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// processing metrics
|
// processing metrics
|
||||||
|
// let mut req = Client::new()
|
||||||
|
// // .user_agent("api_grub/integration_module")
|
||||||
|
// .get(&metrics.url);
|
||||||
use std::hash::DefaultHasher;
|
use std::hash::DefaultHasher;
|
||||||
|
|
||||||
let rand = random::<char>();
|
let rand = random::<char>();
|
||||||
|
|
@ -82,15 +113,32 @@ impl<'a> ApiPoll<'a> {
|
||||||
let api_key = &creds.endpoint.api_key;
|
let api_key = &creds.endpoint.api_key;
|
||||||
|
|
||||||
if !login.is_empty() && !password.is_empty() {
|
if !login.is_empty() && !password.is_empty() {
|
||||||
|
// dbg!("kjgbkasgksjd");
|
||||||
req = req.basic_auth(login, Some(password));
|
req = req.basic_auth(login, Some(password));
|
||||||
}
|
}
|
||||||
if !api_key.is_empty() {
|
if !api_key.is_empty() {
|
||||||
|
// req = req.bearer_auth(&api_key);
|
||||||
|
// req = req.header("authorization", "bearer ");
|
||||||
|
|
||||||
req = req.header("accept", "application/json");
|
req = req.header("accept", "application/json");
|
||||||
req = req.header("x-api-key", api_key);
|
req = req.header("x-api-key", api_key);
|
||||||
|
|
||||||
|
// req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"])
|
||||||
}
|
}
|
||||||
|
// dbg!(&req);
|
||||||
|
// let (client, res) = req.build_split();
|
||||||
|
// let res = res.unwrap();
|
||||||
|
// res.url_mut().is_special()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// dbg!(client);
|
||||||
|
// dbg!(res);
|
||||||
|
// todo!();
|
||||||
|
|
||||||
match req.send().await {
|
match req.send().await {
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
|
// dbg!(&resp.text().await);
|
||||||
if let Ok(response) = resp.text().await {
|
if let Ok(response) = resp.text().await {
|
||||||
match serde_json::to_value(&response) {
|
match serde_json::to_value(&response) {
|
||||||
Err(er) => {
|
Err(er) => {
|
||||||
|
|
@ -99,6 +147,7 @@ impl<'a> ApiPoll<'a> {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
let endpoint_name = &metrics.name;
|
let endpoint_name = &metrics.name;
|
||||||
let preproc = JsonParser::parse(&metrics.measure, &response);
|
let preproc = JsonParser::parse(&metrics.measure, &response);
|
||||||
|
// dbg!(serde_json::to_string_pretty(&preproc));
|
||||||
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
|
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
|
||||||
match Exporter::export_metrics(preproc).await {
|
match Exporter::export_metrics(preproc).await {
|
||||||
Ok(bytes) => {
|
Ok(bytes) => {
|
||||||
|
|
@ -121,15 +170,18 @@ impl<'a> ApiPoll<'a> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn process_endpoint(
|
pub async fn process_endpoint(
|
||||||
|
// client : Arc<Client>,
|
||||||
config : Arc<ConfigEndpoint>,
|
config : Arc<ConfigEndpoint>,
|
||||||
creds : Credentials,
|
creds : Credentials,
|
||||||
|
// exporter : Arc<Exporter>
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// TODO: HAVE TO BE USED
|
//
|
||||||
let _period = config.get_period().unwrap_or(0);
|
let period = config.get_period().unwrap_or(0);
|
||||||
let timeout = config.get_timeout().unwrap_or(5);
|
let timeout = config.get_timeout().unwrap_or(5);
|
||||||
let metrics = Arc::new(config.metrics.clone());
|
let metrics = Arc::new(config.metrics.clone());
|
||||||
let service_id = Arc::new(config.id.clone());
|
let service_id = Arc::new(config.id.clone());
|
||||||
loop {
|
loop {
|
||||||
|
// let exporter = exporter.clone();
|
||||||
let creds = creds.clone();
|
let creds = creds.clone();
|
||||||
let metrics = metrics.clone();
|
let metrics = metrics.clone();
|
||||||
let service_id = service_id.clone();
|
let service_id = service_id.clone();
|
||||||
|
|
@ -159,23 +211,28 @@ impl<'a> ApiPoll<'a> {
|
||||||
// processing
|
// processing
|
||||||
sleep(Duration::from_secs(timeout)).await
|
sleep(Duration::from_secs(timeout)).await
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
|
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
|
||||||
|
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
|
||||||
|
// let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||||
|
// let client = Arc::new(self.client.clone());
|
||||||
let config = Arc::new(self.config.clone());
|
let config = Arc::new(self.config.clone());
|
||||||
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
|
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
|
||||||
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||||
|
|
||||||
for (idx, _) in config.config.iter().enumerate() {
|
for (idx, _) in config.config.iter().enumerate() {
|
||||||
|
// let for_creds = endpoints[idx].clone();
|
||||||
let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
|
let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
|
||||||
let endpoint = endpoints[idx].clone();
|
let endpoint = endpoints[idx].clone();
|
||||||
|
// let client = client.clone();
|
||||||
// TODO: USE EXPORTER
|
|
||||||
#[allow(unused)]
|
|
||||||
let exporter = exporter.clone();
|
let exporter = exporter.clone();
|
||||||
let join_handler = tokio::spawn(async move {
|
let join_handler = tokio::spawn(async move {
|
||||||
Self::process_endpoint(
|
Self::process_endpoint(
|
||||||
|
// client,
|
||||||
endpoint,
|
endpoint,
|
||||||
creds,
|
creds,
|
||||||
|
// exporter.clone()
|
||||||
).await
|
).await
|
||||||
});
|
});
|
||||||
join_handles.push(join_handler);
|
join_handles.push(join_handler);
|
||||||
|
|
@ -188,7 +245,6 @@ impl<'a> ApiPoll<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: FIX TESTS
|
|
||||||
// #[cfg(test)]
|
// #[cfg(test)]
|
||||||
// mod net_unittests {
|
// mod net_unittests {
|
||||||
// use super::*;
|
// use super::*;
|
||||||
|
|
|
||||||
|
|
@ -252,27 +252,6 @@ pub mod v3 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
|
||||||
pub struct MetricOutputExtended {
|
|
||||||
pub id : String,
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
pub json_type : String,
|
|
||||||
pub addr : String,
|
|
||||||
pub value : Value,
|
|
||||||
#[serde(rename = "description")]
|
|
||||||
pub desc : String,
|
|
||||||
}
|
|
||||||
impl MetricOutputExtended {
|
|
||||||
pub fn new_with_slices(id : &str, json_type : &str, addr: &str, desc : &str, value : Value) -> Self {
|
|
||||||
MetricOutputExtended {
|
|
||||||
id : id.to_string(),
|
|
||||||
json_type : json_type.to_string(),
|
|
||||||
addr : addr.to_string(),
|
|
||||||
value : value,
|
|
||||||
desc : desc.to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub struct PrometheusMetrics {
|
pub struct PrometheusMetrics {
|
||||||
|
|
@ -302,33 +281,10 @@ pub mod v3 {
|
||||||
str_metrics.len()
|
str_metrics.len()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
|
||||||
pub struct PrometheusMetricsExtended {
|
|
||||||
pub service_name: String,
|
|
||||||
pub endpoint_name: String,
|
|
||||||
pub metrics: Vec<MetricOutputExtended>,
|
|
||||||
}
|
|
||||||
impl PrometheusMetricsExtended {
|
|
||||||
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
|
|
||||||
Self {
|
|
||||||
service_name : "zvks".to_owned(),
|
|
||||||
endpoint_name : "apiforsnmp".to_owned(),
|
|
||||||
metrics : metrics,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn get_bytes_len(&self) -> usize {
|
|
||||||
let str_metrics = serde_json::to_vec(self).unwrap_or_else(
|
|
||||||
|_| Vec::new()
|
|
||||||
);
|
|
||||||
str_metrics.len()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub mod enode_monitoring {
|
pub mod enode_monitoring {
|
||||||
use std::hash::Hash;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
#[derive(Debug, Serialize)]
|
||||||
|
|
@ -427,14 +383,7 @@ pub mod enode_monitoring {
|
||||||
fn display(&self) -> String;
|
fn display(&self) -> String;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait LazyUnzip<K, V>
|
impl<T> GenericUrl for [T]
|
||||||
where
|
|
||||||
V : Clone,
|
|
||||||
K : Hash + Eq + Clone {
|
|
||||||
fn lazy_unzip(&self) -> HashMap<K, V>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> GenericUrl for [(T, T)]
|
|
||||||
where T : Display {
|
where T : Display {
|
||||||
fn display(&self) -> String {
|
fn display(&self) -> String {
|
||||||
let mut vec: Vec<String> = Vec::new();
|
let mut vec: Vec<String> = Vec::new();
|
||||||
|
|
@ -445,23 +394,12 @@ pub mod enode_monitoring {
|
||||||
if id > 0 {
|
if id > 0 {
|
||||||
vec.push(",".to_owned());
|
vec.push(",".to_owned());
|
||||||
}
|
}
|
||||||
vec.push(format!("%22{}%22", val.0));
|
vec.push(format!("%22{}%22", val));
|
||||||
});
|
});
|
||||||
vec.push("%5D".to_owned());
|
vec.push("%5D".to_owned());
|
||||||
vec.concat()
|
vec.concat()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl<K, V> LazyUnzip<K, V> for [(K, V)]
|
|
||||||
where
|
|
||||||
V : Clone,
|
|
||||||
K : Hash + Eq + Clone {
|
|
||||||
fn lazy_unzip(&self) -> HashMap<K, V> {
|
|
||||||
let mut hm = HashMap::new();
|
|
||||||
self.into_iter()
|
|
||||||
.for_each(|(key, val)| {hm.insert(key.to_owned(), val.to_owned());});
|
|
||||||
hm
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_chunk_size(total_measures: usize) -> usize {
|
pub fn get_chunk_size(total_measures: usize) -> usize {
|
||||||
match total_measures {
|
match total_measures {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,14 @@
|
||||||
|
[package]
|
||||||
|
name = "preproc"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0.95"
|
||||||
|
chrono = "0.4.39"
|
||||||
|
dotenv = "0.15.0"
|
||||||
|
env_logger = "0.11.6"
|
||||||
|
log = "0.4.25"
|
||||||
|
serde = { version = "1.0.217", features = ["derive"] }
|
||||||
|
serde_json = "1.0.137"
|
||||||
|
tokio = { version = "1.43.0", features = ["full"] }
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
// mod for prpeproc config pulling and updating
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod config_unittests {
|
||||||
|
use tokio::test;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
async fn create_unix_socket_server() { assert!(true) }
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
async fn verify_on_valid_config() { assert!(true) }
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
async fn verify_on_invalid_config() { assert!(true) }
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,50 @@
|
||||||
|
use chrono::Local;
|
||||||
|
use env_logger::Builder;
|
||||||
|
use log::LevelFilter;
|
||||||
|
use std::io::Write;
|
||||||
|
use anyhow::Result;
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
pub async fn setup_logger() -> Result<()> {
|
||||||
|
Builder::new()
|
||||||
|
.format(move |buf, record| {
|
||||||
|
writeln!(
|
||||||
|
buf,
|
||||||
|
"|{}| {} [{}] - {}",
|
||||||
|
"config-delivery",
|
||||||
|
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||||
|
record.level(),
|
||||||
|
record.args(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.filter(None, LevelFilter::Info)
|
||||||
|
.target(env_logger::Target::Stdout)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
info!("Logger configured");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod logger_unittests {
|
||||||
|
use tokio::test;
|
||||||
|
use super::*;
|
||||||
|
#[test]
|
||||||
|
async fn check_logger_builder() {
|
||||||
|
Builder::new()
|
||||||
|
.format(move |buf, record| {
|
||||||
|
writeln!(
|
||||||
|
buf,
|
||||||
|
"|{}| {} [{}] - {}",
|
||||||
|
"config-delivery",
|
||||||
|
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||||
|
record.level(),
|
||||||
|
record.args(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.filter(None, LevelFilter::Info)
|
||||||
|
.target(env_logger::Target::Stdout)
|
||||||
|
.init();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,18 @@
|
||||||
|
mod config;
|
||||||
|
mod transform;
|
||||||
|
mod logger;
|
||||||
|
|
||||||
|
use logger::setup_logger;
|
||||||
|
use dotenv::dotenv;
|
||||||
|
use anyhow::Result;
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
|
async fn main() -> Result<()>{
|
||||||
|
let _ = setup_logger().await?;
|
||||||
|
|
||||||
|
info!("Pulling env vars from .env file if exists ...");
|
||||||
|
dotenv().ok();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
// mod for preproccessing and transfering to the CM metrics data
|
||||||
Loading…
Reference in New Issue