diff --git a/Cargo.toml b/Cargo.toml index 8f70c37..97287a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" members = [ - "crates/api-grub", "crates/integr-structs", "crates/preproc", + "crates/api-grub", "crates/integr-structs", ] [profile.dev] diff --git a/README.md b/README.md index 46b067c..71c25e4 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,47 @@ # Интеграционный модуль для проекта "Буревестник ВКС" +## Описание `integr_mod` - Rust-пакет, предоставляющий функционал интеграционного модуля в проекте "Буревестник ВКС", состоящий из бинарных крейтов для: - получение данных через API ВКС - поддержку хранения, валидации и актуализации собственных конфигураций - предобработку полученных данных и ~~отправку `Системе Мониторинга`~~ сохранение в БД -## Current Progress +## Руководство -| Crate (submodule) | Progress | +1. Заполнить .env файл или установить переменные окружения в соотвествии с примером в `.env.example` файле +``` toml +# Template .env for API grabber + +# Prometheus-Exporter info +EXPORTER_URL = "http(s)://ip.ip.ip.ip:port" + +# eNODE.Monitoring configuration +ENODE_MONITORING_IP = "ip.ip.ip.ip" +# admin user is required +ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" +# admin password is required +ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" +``` +2. Произвести сборку проекта командой : +``` bash +cargo build --release +``` + +3. Запустить +> Debug версия +``` bash +cargo run --bin api-grub +``` +или +> Release версия +``` bash +cargo run --release --bin api-grub +``` +## Текущий прогресс + +| Крейт (подмодуль) | Прогресс | |---|---| -|`api-grub` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ | -|`config-delivery` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ | +|`api-grub` | ✅✅✅✅✅✅✅✅✅🛠️ | +|`config-delivery [migrated]` | ❌❌❌❌❌❌❌❌❌❌ | |`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ | -|`preproc` [temp-deprecated] | ✅✅✅❌❌❌❌❌❌❌ | (разработка временно остановлена) +|`preproc` [temp-deprecated] | ❌❌❌❌❌❌❌❌❌❌ | (разработка временно остановлена) diff --git a/crates/api-grub/Cargo.toml b/crates/api-grub/Cargo.toml index 0e54639..5926743 100644 --- a/crates/api-grub/Cargo.toml +++ b/crates/api-grub/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "api-grub" -version = "0.3.13" +version = "1.0.1" edition = "2021" [dependencies] diff --git a/crates/api-grub/src/config.rs b/crates/api-grub/src/config.rs index b945d31..2d8c835 100644 --- a/crates/api-grub/src/config.rs +++ b/crates/api-grub/src/config.rs @@ -1,7 +1,6 @@ // mod to communicate with api-grub config file // 1) check changes in unix-socket // 2) save changes in local config file -use integr_structs::api::ApiConfigV2; use anyhow::{Error, Ok, Result}; use log::{info, warn, error}; use std::{fs, path::Path}; @@ -15,19 +14,9 @@ use integr_structs::api::v3::Config; const CONFIG_PATH: &str = "config_api.json"; const SOCKET_PATH: &str = "api-grub.sock"; -// todo! rewrite to use current_exe +// TODO: rewrite to use current_exe pub async fn pull_local_config() -> Result { - // let conf_path = std::env::current_exe()?; let path = Path::new(CONFIG_PATH); - // return match conf_path.parent() { - // Some(dir) => { - // let config: ApiConfig = from_str( - // &fs::read_to_string(dir.join(CONFIG_PATH))? - // )?; - // Ok(config) - // }, - // None => Err(Error::msg("No local conf was found")) - // } if path.exists() && path.is_file() { let config: Config = from_str( &fs::read_to_string(CONFIG_PATH)? @@ -39,14 +28,13 @@ pub async fn pull_local_config() -> Result { } // for config pulling -// ++++ reader to channel pub async fn init_config_grub_mechanism(tx: &Sender) -> Result<()> { info!("Initializing Unix-Socket listening for pulling new configs..."); let server = init_unix_listener().await?; - // + info!("Listening Unix-Socket..."); let mut buffer = String::new(); - // + loop { if let stdOk((mut stream, _)) = server.accept().await { if let Err(er) = stream.read_to_string(&mut buffer).await { diff --git a/crates/api-grub/src/export.rs b/crates/api-grub/src/export.rs index ed7ef88..a984f7f 100644 --- a/crates/api-grub/src/export.rs +++ b/crates/api-grub/src/export.rs @@ -1,9 +1,9 @@ use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; -use integr_structs::api::v3::PrometheusMetrics; +use integr_structs::api::v3::{PrometheusMetrics, PrometheusMetricsExtended}; use reqwest::Client; use tokio_postgres::NoTls; use std::env; -use anyhow::{Result}; +use anyhow::Result; use log::{info, error}; pub struct Exporter { @@ -34,36 +34,43 @@ impl Exporter { }, } } + #[allow(unused)] pub fn is_no_connection(&self) -> bool { self.pool.is_none() } pub fn init() -> Self { Self { pool : Self::pool_construct(), } } + #[allow(unused)] pub async fn get_connection_from_pool(&self) -> Option { if let Some(pool) = &self.pool { return Some(pool.get().await.ok()?); } None } + #[allow(unused)] pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { - // client. let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?; let _ = client.query(&query, &[&metrics]).await?; Ok(()) } pub async fn export_metrics(metrics: PrometheusMetrics) -> Result { let url = env::var("EXPORTER_URL")?; - // let req = Request::new(Method::PUT, - // Url::parse(metrics)?); - // dbg!(&metrics); + + let req = Client::new() + .post(url) + .json(&metrics) + .send().await; + + req?; + Ok(metrics.get_bytes_len()) + } + pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result { + let url = env::var("EXPORTER_URL")?; let req = Client::new() .post(url) .json(&metrics) .send().await; - // dbg!(&req); - // dbg!(&req.unwrap().text().await); - // todo : rewrite with status code wrapping req?; Ok(metrics.get_bytes_len()) } diff --git a/crates/api-grub/src/monitoring.rs b/crates/api-grub/src/monitoring.rs index d637746..aef585e 100644 --- a/crates/api-grub/src/monitoring.rs +++ b/crates/api-grub/src/monitoring.rs @@ -4,34 +4,30 @@ use serde_json::{Map, Value}; use reqwest::Client; use tokio::sync::Semaphore; use std::sync::Arc; -// use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl}; -use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size}; -// use crate::structs::cmdb::Query; +use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size}; use integr_structs::api::enode_monitoring::cmdb::Query; use tokio::task::JoinHandle; -// use crate::structs::get_chunk_size; use std::pin::Pin; use std::future::Future; -use integr_structs::api::v3::{MetricOutput, PrometheusMetrics}; +use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended}; use log::{error, info, warn}; -// use chrono::{Local, DateTime}; +use std::collections::HashMap; pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> { let timer = tokio::time::Instant::now(); 'outer: loop { let mut a = MonitoringImporter::new().await; a.start_session().await?; + let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![])); + 'inner: loop { if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) { break 'outer; } - let vec = a.get_metrics_list().await.unwrap_or_else(|_| vec![]); - if vec.is_empty() { + if let Err(_) = a.get_measure_info(vec.clone()).await { warn!("Session dropped, creating new ..."); break 'inner; } - let _ = a.get_measure_info(Arc::new(vec)).await; - // a.close_session().await?; tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await } } @@ -70,36 +66,24 @@ impl MonitoringImporter { let client = Client::new(); let url = format!("http://{}/e-data-front/auth/login", self.ip); let fortoken = ForTokenCredentials::new(&self.login, &self.password); - // dbg!(&fortoken); + let client = client .post(url) .header("Content-Type", "application/json") .json(&fortoken); let resp = client.send().await?; let auth = resp.json::().await?; - // dbg!(&auth); + self.set_ts(&fortoken.ts).await; self.access_token = auth.access_token.to_owned(); Ok(()) } - pub async fn close_session(&mut self) -> anyhow::Result<()> { + + pub async fn get_metrics_list(&self) -> anyhow::Result> { let client = Client::new(); - let url = format!("http://{}/e-data-front/auth/logout", self.ip); - let client = client - .post(url) - .header("Content-Type", "application/json") - .header("access-token", &self.access_token); - - let _ = client.send().await?; - - self.access_token.clear(); - Ok(()) - } - pub async fn get_metrics_list(&self) -> anyhow::Result> { - let client = Client::new(); - let mut vec: Vec = Vec::new(); + let mut vec: Vec<(String, String)> = Vec::new(); let url = format!("http://{}/e-cmdb/api/query", self.ip); let client = client .post(url) @@ -107,30 +91,30 @@ impl MonitoringImporter { .header("access-token", &self.access_token) .json(&Query::default()); let resp = client.send().await?.text().await?; - // dbg!(&resp.text().await); let resp: Value = serde_json::from_str(&resp)?; if let Some(arr) = resp.as_array() { for measure in arr { let id = measure.get("id"); let cls = measure.get("cls"); + let name = measure.get("name"); if id.is_some() && cls.is_some() { // todo: later wait for Vaitaliy call of classification let id = id.unwrap().as_i64().unwrap_or_default(); let cls = cls.unwrap().as_str().unwrap_or_else(|| ""); + let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null"); if cls.is_empty() { return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)")); } - // let measure_name = format!("{}${}", cls, id); - vec.push(format!("{}${}", cls, id)); + vec.push((format!("{}${}", cls, id), name.to_string())); } } - // dbg!(vec); } else { return Err(Error::msg("Invalid JSON in response")); } + info!("List of measures was pulled, total - {}", &vec.len()); Ok(vec) } - pub async fn get_measure_info(&self, measures: Arc>) -> anyhow::Result<()> { + pub async fn get_measure_info(&self, measures: Arc>) -> anyhow::Result<()> { let mut sys = sysinfo::System::new(); sys.refresh_cpu_all(); // adaptive permition on task spawm to prevent system overload @@ -139,43 +123,40 @@ impl MonitoringImporter { let client = Arc::new(Client::new()); let measures = measures.clone(); let arc = Arc::new(self.clone()); - // dbg!(&measures.display()); - - // dbg!(&measures.len()); - for measure in measures.chunks(get_chunk_size(measures.len())) { + let chunk_size = get_chunk_size(measures.len()); + info!("List of measures was divided by chunks with len {}, preparing for {} requests ...", chunk_size, measures.len() / chunk_size); + + for measure in measures.chunks(chunk_size) { let permit = sem.clone(); let arc = arc.clone(); let client = client.clone(); + let hm = measure.lazy_unzip(); let measure = Arc::new(measure.display()); - let _permit = permit.acquire().await.unwrap(); - let jh: JoinHandle> = tokio::spawn(async move { - Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await + let jh: JoinHandle> = tokio::spawn(async move { + Self::process_endpoint(measure.clone(), client.clone(), arc.clone(), &hm).await }); jh_vec.push(jh); } - // let mut vals = Vec::new(); + for event in jh_vec { match event.await { Ok(val) => { if let Ok(val) = val { - match crate::export::Exporter::export_metrics(val).await { - Ok(bytes) => info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes), + match crate::export::Exporter::export_extended_metrics(val).await { + Ok(bytes) => {info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes)}, Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er), } - // vals.push(val); } }, Err(er) => println!("Fatal error on async task: {}", er), } } - // dbg!(&vals); - // dbg!(&vals.len()); Ok(()) } - async fn process_endpoint(measure: Arc, client: Arc, arc: Arc) -> anyhow::Result { + async fn process_endpoint(measure: Arc, client: Arc, arc: Arc, hm: &HashMap) -> anyhow::Result { let resp = client .get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure)) .header("Content-Type", "application/json") @@ -185,23 +166,20 @@ impl MonitoringImporter { tokio::task::yield_now().await; let resp: Value = serde_json::from_str(&resp)?; - // let a = Self::extract_metric_data(resp); - Ok( - PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await + PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await ) } - fn extract_metric_data(json: Value) -> Pin>> + Send>> { + fn extract_metric_data(json: Value, hm: &HashMap) -> Pin>> + Send + '_>> { Box::pin(async move { return match json { Value::Object(obj) => { - // let resp: Value = serde_json::from_str(&obj)?; - return Ok(vec![Self::process_value(&obj).await?]) + return Ok(vec![Self::process_value(&obj, hm).await?]) }, Value::Array(arr) => { let mut vec = Vec::new(); for obj in arr { - if let Ok(mut val) = Self::extract_metric_data(obj).await { + if let Ok(mut val) = Self::extract_metric_data(obj, hm).await { // vec.push(val); vec.append(&mut val); } @@ -212,43 +190,50 @@ impl MonitoringImporter { } }) } - async fn process_value(obj : &Map) -> anyhow::Result { - let id = obj.get("id"); - let val = obj.get("value"); - - if id.is_none() || val.is_none() { - return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) + async fn process_value(obj : &Map, hm: &HashMap) -> anyhow::Result { + let id = obj.get("$id"); + let val = obj.get("value"); + let description = { + let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null); + let zero = String::new(); + if dola_ip.is_null() { + zero + } else { + hm.get( + dola_ip.as_str().unwrap_or_else(|| "") + ) + .unwrap_or_else(|| &zero) + .to_owned() } - let id = id.unwrap().as_str().unwrap_or_else(|| ""); - let val = val.unwrap(); + }; - if id.is_empty() { - return Err(Error::msg("Empty `id` field. Invalid JSON response")) - } - // pub struct MetricOutput { - // pub id : String, - // #[serde(rename = "type")] - // json_type : String, - // addr : String, - // pub value : Value, - // } + if id.is_none() || val.is_none() { + return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) + } + let id = id.unwrap().as_str().unwrap_or_else(|| "").replace("$", "_"); + let val = val.unwrap(); - Ok(MetricOutput { - id : id.to_owned(), - json_type : match val { - Value::Number(val) => { - if val.is_i64() { - "i64".to_owned() - } else if val.is_u64() { - "u64".to_owned() - } else { - "f64".to_owned() - } - }, - _ => "unknown".to_owned(), - }, - addr : "enode.monitoring.api".to_owned(), - value : val.clone() - }) + if id.is_empty() { + return Err(Error::msg("Empty `id` field. Invalid JSON response")) + } + + Ok(MetricOutputExtended { + id : id.to_owned(), + json_type : match val { + Value::Number(val) => { + if val.is_i64() { + "i64".to_owned() + } else if val.is_u64() { + "u64".to_owned() + } else { + "f64".to_owned() + } + }, + _ => "unknown".to_owned(), + }, + addr : "enode.monitoring.api".to_owned(), + desc : description, + value : val.clone() + }) } } \ No newline at end of file diff --git a/crates/api-grub/src/net.rs b/crates/api-grub/src/net.rs index 8b21b39..6e25400 100644 --- a/crates/api-grub/src/net.rs +++ b/crates/api-grub/src/net.rs @@ -1,22 +1,18 @@ // module to handle unix-socket connection + pulling info from api + use anyhow::Result; -// use integr_structs::api::{ApiConfigV2, ProcessedEndpoint}; use log::{error, info}; use rand::random; use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, Duration}; -use reqwest::{Client, Method}; +use reqwest::Client; use std::hash::{Hash, Hasher}; use std::sync::Arc; use tokio::task::JoinHandle; -// use tokio::sync::Mutex; use dotenv::dotenv; use crate::json::JsonParser; use crate::export::Exporter; use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics}; -// use md5::compute; - -// type BufferType = Arc>>; // for api info pulling pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver) -> Result<()> { @@ -42,32 +38,13 @@ pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver) } let shared_pool = shared_pool.clone(); info!("Data from API: {:?}", poller.process_polling(shared_pool).await); - // sleep(Duration::from_secs(poller.get_delay().await as u64)).await; } } - // Ok(()) } - -struct RestMethod; - -impl RestMethod { - pub async fn from_str(method: &str) -> Method { - return match method.trim().to_lowercase().as_str() { - "post" => Method::POST, - "patch" => Method::PATCH, - "put" => Method::PUT, - "delete" => Method::DELETE, - "head" => Method::HEAD, - "trace" => Method::TRACE, - "options" => Method::OPTIONS, - "connect" => Method::CONNECT, - "get" | _ => Method::GET - } - } -} struct ApiPoll<'a> { config : &'a mut Config, + #[allow(unused)] client : Client, } @@ -78,26 +55,18 @@ impl<'a> ApiPoll<'a> { client : Client::new(), } } - // can be weak and with bug test needed pub async fn change_config(&mut self, conf: Config) { *self.config = conf; } pub async fn is_default(&self) -> bool { self.config.is_default().await } - // pub async fn get_delay(&self) -> u32 { - // self.config.timeout - // } pub async fn process_metrics( service_id: Arc, metrics: Arc, creds: Credentials, - // exporter: Arc ) -> Result<()> { // processing metrics - // let mut req = Client::new() - // // .user_agent("api_grub/integration_module") - // .get(&metrics.url); use std::hash::DefaultHasher; let rand = random::(); @@ -113,32 +82,15 @@ impl<'a> ApiPoll<'a> { let api_key = &creds.endpoint.api_key; if !login.is_empty() && !password.is_empty() { - // dbg!("kjgbkasgksjd"); req = req.basic_auth(login, Some(password)); } if !api_key.is_empty() { - // req = req.bearer_auth(&api_key); - // req = req.header("authorization", "bearer "); - req = req.header("accept", "application/json"); req = req.header("x-api-key", api_key); - - // req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"]) } - // dbg!(&req); - // let (client, res) = req.build_split(); - // let res = res.unwrap(); - // res.url_mut().is_special() - - - - // dbg!(client); - // dbg!(res); - // todo!(); match req.send().await { Ok(resp) => { - // dbg!(&resp.text().await); if let Ok(response) = resp.text().await { match serde_json::to_value(&response) { Err(er) => { @@ -147,7 +99,6 @@ impl<'a> ApiPoll<'a> { Ok(_) => { let endpoint_name = &metrics.name; let preproc = JsonParser::parse(&metrics.measure, &response); - // dbg!(serde_json::to_string_pretty(&preproc)); let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc); match Exporter::export_metrics(preproc).await { Ok(bytes) => { @@ -170,18 +121,15 @@ impl<'a> ApiPoll<'a> { Ok(()) } pub async fn process_endpoint( - // client : Arc, config : Arc, creds : Credentials, - // exporter : Arc ) -> Result<()> { - // - let period = config.get_period().unwrap_or(0); + // TODO: HAVE TO BE USED + let _period = config.get_period().unwrap_or(0); let timeout = config.get_timeout().unwrap_or(5); let metrics = Arc::new(config.metrics.clone()); let service_id = Arc::new(config.id.clone()); loop { - // let exporter = exporter.clone(); let creds = creds.clone(); let metrics = metrics.clone(); let service_id = service_id.clone(); @@ -211,28 +159,23 @@ impl<'a> ApiPoll<'a> { // processing sleep(Duration::from_secs(timeout)).await } - Ok(()) } pub async fn process_polling(&self, exporter: Arc) -> Result<()> { - // let buffer: BufferType = Arc::new(Mutex::new(vec![])); - // let mut join_handles: Vec>> = vec![]; - // let client = Arc::new(self.client.clone()); let config = Arc::new(self.config.clone()); let endpoints: Vec> = ConfigEndpoint::from_config(config.clone()); let mut join_handles: Vec>> = vec![]; for (idx, _) in config.config.iter().enumerate() { - // let for_creds = endpoints[idx].clone(); let creds = Credentials::from_config_endpoint(endpoints[idx].clone()); let endpoint = endpoints[idx].clone(); - // let client = client.clone(); + + // TODO: USE EXPORTER + #[allow(unused)] let exporter = exporter.clone(); let join_handler = tokio::spawn(async move { Self::process_endpoint( - // client, endpoint, creds, - // exporter.clone() ).await }); join_handles.push(join_handler); @@ -245,6 +188,7 @@ impl<'a> ApiPoll<'a> { } } +// TODO: FIX TESTS // #[cfg(test)] // mod net_unittests { // use super::*; diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index 0c646f8..d96feeb 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -243,7 +243,7 @@ pub mod v3 { pub value : Value, } impl MetricOutput { - pub fn new_with_slices(id : &str, json_type : &str, addr: &str,value : Value) -> Self { + pub fn new_with_slices(id : &str, json_type : &str, addr: &str, value : Value) -> Self { MetricOutput { id : id.to_string(), json_type : json_type.to_string(), @@ -252,6 +252,27 @@ pub mod v3 { } } } + #[derive(Serialize, Deserialize, Debug)] + pub struct MetricOutputExtended { + pub id : String, + #[serde(rename = "type")] + pub json_type : String, + pub addr : String, + pub value : Value, + #[serde(rename = "description")] + pub desc : String, + } + impl MetricOutputExtended { + pub fn new_with_slices(id : &str, json_type : &str, addr: &str, desc : &str, value : Value) -> Self { + MetricOutputExtended { + id : id.to_string(), + json_type : json_type.to_string(), + addr : addr.to_string(), + value : value, + desc : desc.to_string(), + } + } + } #[derive(Serialize, Deserialize, Debug)] pub struct PrometheusMetrics { @@ -281,10 +302,33 @@ pub mod v3 { str_metrics.len() } } + #[derive(Serialize, Deserialize, Debug)] + pub struct PrometheusMetricsExtended { + pub service_name: String, + pub endpoint_name: String, + pub metrics: Vec, + } + impl PrometheusMetricsExtended { + pub async fn new_zvks(metrics: Vec) -> Self { + Self { + service_name : "zvks".to_owned(), + endpoint_name : "apiforsnmp".to_owned(), + metrics : metrics, + } + } + pub fn get_bytes_len(&self) -> usize { + let str_metrics = serde_json::to_vec(self).unwrap_or_else( + |_| Vec::new() + ); + str_metrics.len() + } + } } pub mod enode_monitoring { + use std::hash::Hash; + use super::*; #[derive(Debug, Serialize)] @@ -383,7 +427,14 @@ pub mod enode_monitoring { fn display(&self) -> String; } - impl GenericUrl for [T] + pub trait LazyUnzip + where + V : Clone, + K : Hash + Eq + Clone { + fn lazy_unzip(&self) -> HashMap; + } + + impl GenericUrl for [(T, T)] where T : Display { fn display(&self) -> String { let mut vec: Vec = Vec::new(); @@ -394,12 +445,23 @@ pub mod enode_monitoring { if id > 0 { vec.push(",".to_owned()); } - vec.push(format!("%22{}%22", val)); + vec.push(format!("%22{}%22", val.0)); }); vec.push("%5D".to_owned()); vec.concat() } } + impl LazyUnzip for [(K, V)] + where + V : Clone, + K : Hash + Eq + Clone { + fn lazy_unzip(&self) -> HashMap { + let mut hm = HashMap::new(); + self.into_iter() + .for_each(|(key, val)| {hm.insert(key.to_owned(), val.to_owned());}); + hm + } + } pub fn get_chunk_size(total_measures: usize) -> usize { match total_measures { diff --git a/crates/preproc/Cargo.toml b/crates/preproc/Cargo.toml deleted file mode 100644 index 9ef8c92..0000000 --- a/crates/preproc/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "preproc" -version = "0.1.0" -edition = "2021" - -[dependencies] -anyhow = "1.0.95" -chrono = "0.4.39" -dotenv = "0.15.0" -env_logger = "0.11.6" -log = "0.4.25" -serde = { version = "1.0.217", features = ["derive"] } -serde_json = "1.0.137" -tokio = { version = "1.43.0", features = ["full"] } diff --git a/crates/preproc/src/config.rs b/crates/preproc/src/config.rs deleted file mode 100644 index e22c5e0..0000000 --- a/crates/preproc/src/config.rs +++ /dev/null @@ -1,21 +0,0 @@ -// mod for prpeproc config pulling and updating - - - - - - - -#[cfg(test)] -mod config_unittests { - use tokio::test; - - #[test] - async fn create_unix_socket_server() { assert!(true) } - - #[test] - async fn verify_on_valid_config() { assert!(true) } - - #[test] - async fn verify_on_invalid_config() { assert!(true) } -} \ No newline at end of file diff --git a/crates/preproc/src/logger.rs b/crates/preproc/src/logger.rs deleted file mode 100644 index 6578b44..0000000 --- a/crates/preproc/src/logger.rs +++ /dev/null @@ -1,50 +0,0 @@ -use chrono::Local; -use env_logger::Builder; -use log::LevelFilter; -use std::io::Write; -use anyhow::Result; -use log::info; - -pub async fn setup_logger() -> Result<()> { - Builder::new() - .format(move |buf, record| { - writeln!( - buf, - "|{}| {} [{}] - {}", - "config-delivery", - Local::now().format("%d-%m-%Y %H:%M:%S"), - record.level(), - record.args(), - ) - }) - .filter(None, LevelFilter::Info) - .target(env_logger::Target::Stdout) - .init(); - - info!("Logger configured"); - Ok(()) -} - - -#[cfg(test)] -mod logger_unittests { - use tokio::test; - use super::*; - #[test] - async fn check_logger_builder() { - Builder::new() - .format(move |buf, record| { - writeln!( - buf, - "|{}| {} [{}] - {}", - "config-delivery", - Local::now().format("%d-%m-%Y %H:%M:%S"), - record.level(), - record.args(), - ) - }) - .filter(None, LevelFilter::Info) - .target(env_logger::Target::Stdout) - .init(); - } -} \ No newline at end of file diff --git a/crates/preproc/src/main.rs b/crates/preproc/src/main.rs deleted file mode 100644 index 3d9c8e1..0000000 --- a/crates/preproc/src/main.rs +++ /dev/null @@ -1,18 +0,0 @@ -mod config; -mod transform; -mod logger; - -use logger::setup_logger; -use dotenv::dotenv; -use anyhow::Result; -use log::info; - -#[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<()>{ - let _ = setup_logger().await?; - - info!("Pulling env vars from .env file if exists ..."); - dotenv().ok(); - - Ok(()) -} diff --git a/crates/preproc/src/transform.rs b/crates/preproc/src/transform.rs deleted file mode 100644 index 43337c5..0000000 --- a/crates/preproc/src/transform.rs +++ /dev/null @@ -1 +0,0 @@ -// mod for preproccessing and transfering to the CM metrics data \ No newline at end of file