diff --git a/.env b/.env deleted file mode 100644 index 2cbea56..0000000 --- a/.env +++ /dev/null @@ -1,3 +0,0 @@ -CONFIG_SERVER_CREDS = "ws://127.0.0.1:8080" -API_GRUBBER_SOCKET = "api-grub.sock" -PREPROC_SOCKET = "preproc.sock" \ No newline at end of file diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..2604f9a --- /dev/null +++ b/.env.example @@ -0,0 +1,15 @@ +# Template .env for API grabber + +# PostgreSQL connection [DEPRECATED] +DB_HOST = "ip.addr.postgresql.server" +DB_USER = "db_user" +DB_PASSWORD = "db_user_password" +DB_DBNAME = "db_name"1 + +# Prometheus-Exporter info +EXPORTER_URL = "http(s)://ip.ip.ip.ip:port" + +# eNODE.Monitoring configuration +ENODE_MONITORING_IP = "ip.ip.ip.ip" +ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required +ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required \ No newline at end of file diff --git a/.gitignore b/.gitignore index 5195f75..2a25479 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target Cargo.lock -*.sock \ No newline at end of file +*.sock +.env \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index bbeed62..8f70c37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" members = [ - "crates/api-grub", "crates/config-delivery", "crates/integr-structs", "crates/preproc", + "crates/api-grub", "crates/integr-structs", "crates/preproc", ] [profile.dev] diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5215521 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +FROM rust:1.84 AS builder +WORKDIR /app + +RUN apt update && apt install -y musl-tools +RUN rustup target add x86_64-unknown-linux-musl + +COPY . . +RUN cargo test +RUN cargo build --release --target=x86_64-unknown-linux-musl + +FROM alpine:latest +WORKDIR /app + +COPY --from=builder /app/target/x86_64-unknown-linux-musl/release/api-grub /app/api-grub +RUN apk add --no-cache ca-certificates + + +ENTRYPOINT ["/app/api-grub"] \ No newline at end of file diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..2e83346 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,80 @@ +pipeline { + agent any + environment { + REGISTRY_NAME = 'registry.entcor/trust-module' + IMAGE_NAME = "integration-module" + GITEA_REPOSITORY_URL = "http://git.entcor/api/v1/repos/" + } + + stages { + stage('Init variables') { + when { + expression { env.CHANGE_BRANCH?.startsWith('rc') } + } + steps { + script { + env.IMAGE_TAG = sh(script: "git describe --tags --abbrev=0", returnStdout: true).trim() + } + } + } + stage('Build tagged image and run tests') { + when { + expression { env.CHANGE_BRANCH?.startsWith('rc') } + } + steps { + script { + try { + def image = docker.build("${env.IMAGE_NAME}:${env.IMAGE_TAG}") + sh "docker tag ${env.IMAGE_NAME}:${env.IMAGE_TAG} ${env.REGISTRY_NAME}/${env.IMAGE_NAME}:${env.IMAGE_TAG}" + } catch (Exception e) { + error("Tests failed: ${e.message}") + } + } + } + } + stage ('Push docker image to registry') { + when { + expression { env.CHANGE_BRANCH?.startsWith('rc') } + } + steps { + script { + docker.withRegistry('https://registry.entcor/harbor/', 'harbor-credentials-id') { + docker.image("${env.REGISTRY_NAME}/${env.IMAGE_NAME}:${env.IMAGE_TAG}").push() + } + } + } + } + } + post { + always { + script { + echo "Cleaning up workspace..." + sh "rm -rf ${env.WORKSPACE}/rc/ || true" + } + } + success { + script { + if (env.CHANGE_BRANCH?.startsWith('rc')) { + echo "Attempting to merge PR ${env.CHANGE_ID} into master..." + withCredentials([usernamePassword(credentialsId: 'gitea_creds', usernameVariable: 'GITEA_USER', passwordVariable: 'GITEA_PASS')]) { + def prId = env.CHANGE_ID + sh """ + curl -X POST \ + -u "${GITEA_USER}:${GITEA_PASS}" \ + -H "Content-Type: application/json" \ + -d '{"do":"merge"}' \ + http://git.entcor/api/v1/repos/deployer3000/integration-module/pulls/${prId}/merge + """ + echo "PR ${prId} merged successfully into master!" + } + } + } + } + failure { + echo "Pipeline failed. Check the logs for details." + } + aborted { + echo "Pipeline was aborted." + } + } +} \ No newline at end of file diff --git a/README.md b/README.md index c8897c7..46b067c 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ | Crate (submodule) | Progress | |---|---| -|`api-grub` | ✅✅✅✅✅✅✅🔲🔲🔲 | -|`config-delivery` | ✅✅✅✅🔲🔲🔲🔲🔲🔲 | -|`integrs-structs` | ✅✅✅✅✅✅🔲🔲🔲🔲 | -|`preproc` | 🔲🔲🔲🔲🔲🔲🔲🔲🔲🔲 | +|`api-grub` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ | +|`config-delivery` | ✅✅✅✅✅✅✅🛠️🛠️🛠️ | +|`integrs-structs` | ✅✅✅✅✅✅✅✅✅✅ | +|`preproc` [temp-deprecated] | ✅✅✅❌❌❌❌❌❌❌ | (разработка временно остановлена) diff --git a/config_api.json b/config_api.json index 9136b29..d9d25fa 100644 --- a/config_api.json +++ b/config_api.json @@ -1,13 +1,32 @@ { - "endpoints" : [ - { - "url" : "http://127.0.0.1:8081/ping", - "method" : "GET" - }, - { - "url" : "http://127.0.0.1:8081/", - "method" : "GET" + "config": [ + { + "id":"zvks", + "login" : "", + "pass" : "", + "api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711", + "period" : "", + "timeout" : "5", + "metrics" : [ + { + "name": "conferences", + "url": "https://demo.vcs.vinteo.dev/api/v1/conferences", + "measure": [ + { "id":"number", "type": "text", "addr": "data.conferences[].number" }, + { "id":"total", "type": "integer", "addr": "data.total" }, + { "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" }, + { "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" }, + { "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" } + ] + }, + { + "name": "abonents", + "url": "https://demo.vcs.vinteo.dev/api/v1/accounts", + "measure": [ + { "id":"total", "type": "integer", "addr": "data.total" } + ] + } + ] } - ], - "delay" : 5 + ] } \ No newline at end of file diff --git a/crates/api-grub/Cargo.toml b/crates/api-grub/Cargo.toml index 8c51835..0e54639 100644 --- a/crates/api-grub/Cargo.toml +++ b/crates/api-grub/Cargo.toml @@ -12,4 +12,11 @@ env_logger = "0.11.6" log = "0.4.25" anyhow = "1.0.95" chrono = "0.4.39" -reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] } \ No newline at end of file +reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] } +deadpool-postgres = { version = "0.14.1", features = ["serde"] } +tokio-postgres = "0.7.12" +dotenv = "0.15.0" +md5 = "0.7.0" +rand = "0.9.0" +sysinfo = "0.33.1" +openssl = { version = "0.10", features = ["vendored"] } diff --git a/crates/api-grub/src/config.rs b/crates/api-grub/src/config.rs index 9a30354..b945d31 100644 --- a/crates/api-grub/src/config.rs +++ b/crates/api-grub/src/config.rs @@ -1,7 +1,7 @@ // mod to communicate with api-grub config file // 1) check changes in unix-socket // 2) save changes in local config file -use integr_structs::api::ApiConfig; +use integr_structs::api::ApiConfigV2; use anyhow::{Error, Ok, Result}; use log::{info, warn, error}; use std::{fs, path::Path}; @@ -10,12 +10,13 @@ use tokio::{io::AsyncReadExt, net::UnixListener}; use tokio::time::{sleep, Duration}; use std::result::Result::Ok as stdOk; use tokio::sync::mpsc::Sender; +use integr_structs::api::v3::Config; const CONFIG_PATH: &str = "config_api.json"; const SOCKET_PATH: &str = "api-grub.sock"; // todo! rewrite to use current_exe -pub async fn pull_local_config() -> Result { +pub async fn pull_local_config() -> Result { // let conf_path = std::env::current_exe()?; let path = Path::new(CONFIG_PATH); // return match conf_path.parent() { @@ -28,7 +29,7 @@ pub async fn pull_local_config() -> Result { // None => Err(Error::msg("No local conf was found")) // } if path.exists() && path.is_file() { - let config: ApiConfig = from_str( + let config: Config = from_str( &fs::read_to_string(CONFIG_PATH)? )?; Ok(config) @@ -39,7 +40,7 @@ pub async fn pull_local_config() -> Result { // for config pulling // ++++ reader to channel -pub async fn init_config_grub_mechanism(tx: &Sender) -> Result<()> { +pub async fn init_config_grub_mechanism(tx: &Sender) -> Result<()> { info!("Initializing Unix-Socket listening for pulling new configs..."); let server = init_unix_listener().await?; // @@ -51,7 +52,7 @@ pub async fn init_config_grub_mechanism(tx: &Sender) -> Result<()> { if let Err(er) = stream.read_to_string(&mut buffer).await { warn!("Cannot read config from stream due to {}", er); } else { - let config: Result = from_str(&buffer); + let config: Result = from_str(&buffer); if let stdOk(conf) = config { info!("New config was pulled from Unix-Stream. Saving it locally and sharing with API-grub module..."); if let Err(er) = save_new_config(&buffer).await { @@ -97,13 +98,13 @@ mod config_unittests { #[test] async fn check_save_new_config() { use std::fs; - use integr_structs::api::ApiConfig; + use integr_structs::api::v3::Config; use serde_json::to_string; let test_config_path = "test_config_api.json"; // config gen - let config = to_string::(&ApiConfig::default()); + let config = to_string::(&Config::default()); assert!(config.is_ok()); let config = config.unwrap(); diff --git a/crates/api-grub/src/export.rs b/crates/api-grub/src/export.rs new file mode 100644 index 0000000..ed7ef88 --- /dev/null +++ b/crates/api-grub/src/export.rs @@ -0,0 +1,71 @@ +use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; +use integr_structs::api::v3::PrometheusMetrics; +use reqwest::Client; +use tokio_postgres::NoTls; +use std::env; +use anyhow::{Result}; +use log::{info, error}; + +pub struct Exporter { + pool : Option, +} + +impl Exporter { + fn config_construct() -> Result { + let mut cfg = Config::new(); + cfg.host = Some(env::var("DB_HOST")?); + cfg.dbname = Some(env::var("DB_DBNAME")?); + cfg.user = Some(env::var("DB_USER")?); + cfg.password = Some(env::var("DB_PASSWORD")?); + Ok(cfg) + } + fn pool_construct() -> Option { + return match Self::config_construct() { + Ok(config) => { + if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) { + info!("Connected to PostgreSQL"); + return Some(pool); + } + None + }, + Err(_) => { + error!("Bad DB credentials or it's unreachable"); + None + }, + } + } + pub fn is_no_connection(&self) -> bool { self.pool.is_none() } + pub fn init() -> Self { + Self { + pool : Self::pool_construct(), + } + } + pub async fn get_connection_from_pool(&self) -> Option { + if let Some(pool) = &self.pool { + return Some(pool.get().await.ok()?); + } + None + } + pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { + // client. + let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?; + let _ = client.query(&query, &[&metrics]).await?; + Ok(()) + } + pub async fn export_metrics(metrics: PrometheusMetrics) -> Result { + let url = env::var("EXPORTER_URL")?; + // let req = Request::new(Method::PUT, + // Url::parse(metrics)?); + // dbg!(&metrics); + let req = Client::new() + .post(url) + .json(&metrics) + .send().await; + // dbg!(&req); + // dbg!(&req.unwrap().text().await); + // todo : rewrite with status code wrapping + req?; + Ok(metrics.get_bytes_len()) + } + +} \ No newline at end of file diff --git a/crates/api-grub/src/json.rs b/crates/api-grub/src/json.rs new file mode 100644 index 0000000..c6ecedd --- /dev/null +++ b/crates/api-grub/src/json.rs @@ -0,0 +1,155 @@ +// use serde::{de::value, Serialize}; +use serde_json::{json, Value}; +use integr_structs::api::v3::{Metric, MetricOutput}; + +pub struct JsonParser; + +impl JsonParser { + pub fn parse(targets: &Vec, json: &str) -> Vec { + let mut res_vec: Vec = Vec::new(); + for target in targets { + let metric = match target.addr.contains("[") { + true => JsonParser::get_sum_of_metrics_in_array(target, json), + false => JsonParser::get_metric(target, json), + }; + res_vec.push(MetricOutput::new_with_slices(&target.id, &target.json_type, &target.addr, metric)); + } + res_vec + } + fn get_sum_of_metrics_in_array(target: &Metric, json: &str) -> Value { + if target.addr.is_empty() { + return Value::Null; + } + + // let mut vec_value: Vec = Vec::new(); + // let mut array_key = String::new(); + let mut value_json: Value = serde_json::from_str(json).unwrap_or(Value::Null); + + let target_attr_vec = target.addr + .split_terminator('.') + .collect::>(); + // for keys in [] brackets + let mut key_tag = String::new(); + + for (global_idx, &key) in target_attr_vec.iter().enumerate() { + // if array + let key_checked = if key.contains('[') { + let key_idx = key.find("[").unwrap(); + key_tag = key.chars() + .enumerate() + .filter(|(idx, chr)| *idx > key_idx && *chr != ']') + .map(|(_, chr)| chr) + .collect::(); + // dbg!(&key_tag); + key.chars() + .enumerate() + .filter(|(idx, _)| *idx < key_idx) + .map(|(_, chr)| chr) + .collect::() + // dbg!(value_json.get(&array_key).unwrap_or(&Value::Null)); + // value_json = value_json.get(array_key).unwrap_or(&Value::Null).clone(); + // continue; + // new_key.as_str() + // dbg!(key); APPROVED + // TODO: need to check key in [] type of [KEY] + } else {key.to_owned()}; + + // if already array + match value_json.get(key_checked) { + Some(val) => { + match val { + Value::Array(array) => { + // form new target array + let new_array_target = target_attr_vec + .iter() + .enumerate() + .filter(|(idx, _)| *idx > global_idx) + .map(|(_, &chr)| chr.to_owned()) + .collect::>(); + // get_values_in_array + // get_tags_in_array + // // slice_tags_with_values + // dbg!(&array); + // dbg!(&new_array_target); + let res_arr = Self::get_values_in_array(array, &new_array_target); + if &key_tag == "" { + return res_arr.into(); + } + return Self::slice_with_tags_in_array(array, &res_arr, &key_tag).into() + + }, + _ => value_json = val.clone(), + } + }, + None => return Value::Null, + } + } + value_json + } + + fn slice_with_tags_in_array(array: &Vec, metrics: &Vec, tag_name: &str) -> Vec { + if tag_name.is_empty() { + return metrics.clone(); + } + // array[0].as_object().unwrap_or(json!(Value::Null)) + let mut values: Vec = Vec::new(); + array.iter() + .enumerate() + .map(|(idx, val)| { + let val = val.get(tag_name).unwrap_or(&Value::Null).clone(); + (serde_json::from_value::>( + json!({"tag_name": tag_name, "tag_value": val}) + ), + serde_json::from_value::>(metrics[idx].clone())) + }) + .for_each(|(tags, val)| { + if val.is_ok() && tags.is_ok() { + let mut tags = tags.unwrap(); + let mut val = val.unwrap(); + tags.append(&mut val); + dbg!(&tags); + values.push(json!(tags)); + } + }); + if values.len() == 0 { + return metrics.clone(); + } + values + } + fn get_values_in_array(array: &Vec, fields: &Vec) -> Vec { + let mut values: Vec = Vec::new(); + for obj in array { + // dbg!(obj); + let mut obrez = obj.clone(); + for field in fields { + obrez = obrez.get(field).unwrap_or(&Value::Null).clone(); + match obrez { + Value::Object(_) => {continue}, + _ => { + values.push(json!({field: obrez.clone()})); + }, + // None => {values.push(Value::Null)}, + } + } + } + values + } + + fn get_metric(target: &Metric, json: &str) -> Value { + if target.addr.is_empty() { + return Value::Null; + } + let mut value_json: Value = serde_json::from_str(json).unwrap_or(Value::Null); + + let target_attr_vec = target.addr + .split_terminator('.') + .collect::>(); + for key in target_attr_vec { + match value_json.get(key) { + Some(val) => value_json = val.clone(), + None => return Value::Null, + } + } + value_json + } +} diff --git a/crates/api-grub/src/main.rs b/crates/api-grub/src/main.rs index 208d646..cac81e3 100644 --- a/crates/api-grub/src/main.rs +++ b/crates/api-grub/src/main.rs @@ -1,35 +1,74 @@ mod config; mod net; mod logger; +mod json; +mod export; +mod monitoring; use anyhow::Result; -use integr_structs::api::ApiConfig; +// use integr_structs::api::ApiConfigV2; +use integr_structs::api::v3::Config; use logger::setup_logger; -use log::{info, warn}; +// use log::{info, warn}; use config::{pull_local_config, init_config_grub_mechanism}; use net::init_api_grub_mechanism; use tokio::sync::mpsc; +use log::{error, info, warn}; +use monitoring::get_metrics_from_monitoring; #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()>{ - // 3 coroutines - // 1) unix-socket coroutine (for config updating) - // 2) api coroutine - // 3) ? + dotenv::dotenv().ok(); setup_logger().await?; let config = get_config().await; // config update channel - let (tx, mut rx) = mpsc::channel::(1); + let (tx, mut rx) = mpsc::channel::(1); // futures - let config_fut = init_config_grub_mechanism(&tx); - let grub_fut = init_api_grub_mechanism(config, &mut rx); + // todo : rewrite with spawn + // let config_fut = init_config_grub_mechanism(&tx); + // let grub_fut = init_api_grub_mechanism(config, &mut rx); - let _ = tokio::join!(config_fut, grub_fut); + let event_config = tokio::spawn(async move { + match init_config_grub_mechanism(&tx).await { + Ok(_) => { + info!("Config task deinitialized"); + }, + Err(er) => { + error!("Config task returned an error : {}", er); + }, + } + }); + let event_grub = tokio::spawn(async move { + if std::env::var("ENODE_MONITORING_IP").is_ok() { + match get_metrics_from_monitoring(0, 5).await { + Ok(_) => { + info!("Grabing (eNODE.Monitoring) task deinitialized"); + }, + Err(er) => { + error!("Grabing task returned an error : {}", er); + }, + } + } else { + match init_api_grub_mechanism(config, &mut rx).await { + Ok(_) => { + info!("Grabing task deinitialized"); + }, + Err(er) => { + error!("Grabing task returned an error : {}", er); + }, + } + } + }); + let events_handler = vec![event_config, event_grub]; + for event in events_handler { + let _ = event.await; + } + // let _ = tokio::join!(config_fut, grub_fut); Ok(()) } -async fn get_config() -> ApiConfig { +async fn get_config() -> Config { return match pull_local_config().await { Ok(conf) => { info!("Local config was loaded"); @@ -37,7 +76,7 @@ async fn get_config() -> ApiConfig { }, Err(er) => { warn!("Cannot get local config due to {}", er); - ApiConfig::default() + Config::default() } } } \ No newline at end of file diff --git a/crates/api-grub/src/monitoring.rs b/crates/api-grub/src/monitoring.rs new file mode 100644 index 0000000..d637746 --- /dev/null +++ b/crates/api-grub/src/monitoring.rs @@ -0,0 +1,254 @@ +use std::env; +use anyhow::Error; +use serde_json::{Map, Value}; +use reqwest::Client; +use tokio::sync::Semaphore; +use std::sync::Arc; +// use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl}; +use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size}; +// use crate::structs::cmdb::Query; +use integr_structs::api::enode_monitoring::cmdb::Query; +use tokio::task::JoinHandle; +// use crate::structs::get_chunk_size; +use std::pin::Pin; +use std::future::Future; +use integr_structs::api::v3::{MetricOutput, PrometheusMetrics}; +use log::{error, info, warn}; +// use chrono::{Local, DateTime}; + +pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> { + let timer = tokio::time::Instant::now(); + 'outer: loop { + let mut a = MonitoringImporter::new().await; + a.start_session().await?; + 'inner: loop { + if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) { + break 'outer; + } + let vec = a.get_metrics_list().await.unwrap_or_else(|_| vec![]); + if vec.is_empty() { + warn!("Session dropped, creating new ..."); + break 'inner; + } + let _ = a.get_measure_info(Arc::new(vec)).await; + // a.close_session().await?; + tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await + } + } + Ok(()) +} + +#[derive(Debug, Clone)] +pub struct MonitoringImporter { + ip : String, + login : String, + password : String, + access_token : String, + ts : String, +} + +impl MonitoringImporter { + pub async fn new() -> Self { + MonitoringImporter { + ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()), + login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()), + password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()), + access_token : String::new(), + ts : String::new(), + } + } + async fn is_valid(&self) -> bool { + !self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty() + } + async fn set_ts(&mut self, ts: &str) { + self.ts = ts.to_owned(); + } + pub async fn start_session(&mut self) -> anyhow::Result<()> { + if !self.is_valid().await { + return Err(Error::msg("Invalid eNODE-Monitoring configuration")); + } + let client = Client::new(); + let url = format!("http://{}/e-data-front/auth/login", self.ip); + let fortoken = ForTokenCredentials::new(&self.login, &self.password); + // dbg!(&fortoken); + let client = client + .post(url) + .header("Content-Type", "application/json") + .json(&fortoken); + let resp = client.send().await?; + let auth = resp.json::().await?; + // dbg!(&auth); + self.set_ts(&fortoken.ts).await; + + self.access_token = auth.access_token.to_owned(); + + Ok(()) + } + pub async fn close_session(&mut self) -> anyhow::Result<()> { + let client = Client::new(); + let url = format!("http://{}/e-data-front/auth/logout", self.ip); + let client = client + .post(url) + .header("Content-Type", "application/json") + .header("access-token", &self.access_token); + + let _ = client.send().await?; + + self.access_token.clear(); + Ok(()) + } + pub async fn get_metrics_list(&self) -> anyhow::Result> { + let client = Client::new(); + let mut vec: Vec = Vec::new(); + let url = format!("http://{}/e-cmdb/api/query", self.ip); + let client = client + .post(url) + .header("Content-Type", "application/json") + .header("access-token", &self.access_token) + .json(&Query::default()); + let resp = client.send().await?.text().await?; + // dbg!(&resp.text().await); + let resp: Value = serde_json::from_str(&resp)?; + if let Some(arr) = resp.as_array() { + for measure in arr { + let id = measure.get("id"); + let cls = measure.get("cls"); + if id.is_some() && cls.is_some() { + // todo: later wait for Vaitaliy call of classification + let id = id.unwrap().as_i64().unwrap_or_default(); + let cls = cls.unwrap().as_str().unwrap_or_else(|| ""); + if cls.is_empty() { + return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)")); + } + // let measure_name = format!("{}${}", cls, id); + vec.push(format!("{}${}", cls, id)); + } + } + // dbg!(vec); + } else { + return Err(Error::msg("Invalid JSON in response")); + } + Ok(vec) + } + pub async fn get_measure_info(&self, measures: Arc>) -> anyhow::Result<()> { + let mut sys = sysinfo::System::new(); + sys.refresh_cpu_all(); + // adaptive permition on task spawm to prevent system overload + let sem = Arc::new(Semaphore::new(sys.cpus().len())); + let mut jh_vec = Vec::new(); + let client = Arc::new(Client::new()); + let measures = measures.clone(); + let arc = Arc::new(self.clone()); + // dbg!(&measures.display()); + + // dbg!(&measures.len()); + for measure in measures.chunks(get_chunk_size(measures.len())) { + let permit = sem.clone(); + let arc = arc.clone(); + let client = client.clone(); + let measure = Arc::new(measure.display()); + + let _permit = permit.acquire().await.unwrap(); + + let jh: JoinHandle> = tokio::spawn(async move { + Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await + + }); + jh_vec.push(jh); + } + // let mut vals = Vec::new(); + for event in jh_vec { + match event.await { + Ok(val) => { + if let Ok(val) = val { + match crate::export::Exporter::export_metrics(val).await { + Ok(bytes) => info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes), + Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er), + } + // vals.push(val); + } + }, + Err(er) => println!("Fatal error on async task: {}", er), + } + } + // dbg!(&vals); + // dbg!(&vals.len()); + Ok(()) + } + async fn process_endpoint(measure: Arc, client: Arc, arc: Arc) -> anyhow::Result { + let resp = client + .get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure)) + .header("Content-Type", "application/json") + .header("access-token", &arc.access_token) + .send().await? + .text().await?; + tokio::task::yield_now().await; + + let resp: Value = serde_json::from_str(&resp)?; + // let a = Self::extract_metric_data(resp); + + Ok( + PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await + ) + } + fn extract_metric_data(json: Value) -> Pin>> + Send>> { + Box::pin(async move { + return match json { + Value::Object(obj) => { + // let resp: Value = serde_json::from_str(&obj)?; + return Ok(vec![Self::process_value(&obj).await?]) + }, + Value::Array(arr) => { + let mut vec = Vec::new(); + for obj in arr { + if let Ok(mut val) = Self::extract_metric_data(obj).await { + // vec.push(val); + vec.append(&mut val); + } + } + return Ok(vec) + }, + _ => Err(Error::msg("Invalid JSON format")), + } + }) + } + async fn process_value(obj : &Map) -> anyhow::Result { + let id = obj.get("id"); + let val = obj.get("value"); + + if id.is_none() || val.is_none() { + return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) + } + let id = id.unwrap().as_str().unwrap_or_else(|| ""); + let val = val.unwrap(); + + if id.is_empty() { + return Err(Error::msg("Empty `id` field. Invalid JSON response")) + } + // pub struct MetricOutput { + // pub id : String, + // #[serde(rename = "type")] + // json_type : String, + // addr : String, + // pub value : Value, + // } + + Ok(MetricOutput { + id : id.to_owned(), + json_type : match val { + Value::Number(val) => { + if val.is_i64() { + "i64".to_owned() + } else if val.is_u64() { + "u64".to_owned() + } else { + "f64".to_owned() + } + }, + _ => "unknown".to_owned(), + }, + addr : "enode.monitoring.api".to_owned(), + value : val.clone() + }) + } +} \ No newline at end of file diff --git a/crates/api-grub/src/net.rs b/crates/api-grub/src/net.rs index 62ddb5c..8b21b39 100644 --- a/crates/api-grub/src/net.rs +++ b/crates/api-grub/src/net.rs @@ -1,82 +1,35 @@ // module to handle unix-socket connection + pulling info from api -use anyhow::{Error, Result}; -use integr_structs::api::ApiConfig; +use anyhow::Result; +// use integr_structs::api::{ApiConfigV2, ProcessedEndpoint}; use log::{error, info}; +use rand::random; use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, Duration}; use reqwest::{Client, Method}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; +use tokio::task::JoinHandle; +// use tokio::sync::Mutex; +use dotenv::dotenv; +use crate::json::JsonParser; +use crate::export::Exporter; +use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics}; +// use md5::compute; -struct RestMethod; - -impl RestMethod { - pub async fn from_str(method: &str) -> Method { - return match method.trim().to_lowercase().as_str() { - "post" => Method::POST, - "patch" => Method::PATCH, - "put" => Method::PUT, - "delete" => Method::DELETE, - "get" | _ => Method::GET - } - } -} -struct ApiPoll<'a> { - config : &'a mut ApiConfig, - client : Client, -} - -impl<'a> ApiPoll<'a> { - pub async fn new(poll_cfg : &'a mut ApiConfig) -> Self { - Self { - config : poll_cfg, - client : Client::new(), - } - } - // can be weak and with bug test needed - pub async fn change_config(&mut self, conf: ApiConfig) { - *self.config = conf; - } - pub async fn is_default(&self) -> bool { - self.config.endpoints.len() == 0 - } - pub async fn process_polling(&self) -> Result> { - let mut buffer: Vec = vec![]; - // TODO: rewrite nextly to async - for point in &self.config.endpoints { - // let a = self.client.get(&point.url).send().await.unwrap(); - // a.text().await.unwrap(); - match self.client.request(RestMethod::from_str(&point.method).await, &point.url).send().await { - Ok(resp) => { - if !resp.status().is_success() { - error!("ErrorCode in Response from API. Check configuration"); - continue; - } - if let Ok(text) = resp.text().await { - info!("{}: {} - Successfull grubbing info", &point.method.to_uppercase(), &point.url); - buffer.push(text); - } else { - error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url); - } - }, - Err(er) => { - error!("{}: {} - Query crushed due to {}", &point.method.to_uppercase(), &point.url, er); - }, - } - } - match &buffer.len() { - 0 => Err(Error::msg("Error due to API grubbing. Check config" )), - _ => Ok(buffer), - } - } - pub async fn get_delay(&self) -> u32 { - self.config.delay - } -} +// type BufferType = Arc>>; // for api info pulling -pub async fn init_api_grub_mechanism(config: ApiConfig, rx: &mut Receiver) -> Result<()> { - info!("Initializing API-info grubbing mechanism..."); +pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver) -> Result<()> { + info!("Initializing API-info grubbing mechanism :"); + info!("1) Loading vars from .env file if exists..."); + let _ = dotenv().ok(); + let mut config = config; let mut poller = ApiPoll::new(&mut config).await; + info!("2) Api-Poller has initialized"); + let client = Exporter::init(); + info!("3) Exporter has initialized"); + let shared_pool = Arc::new(client); loop { if poller.is_default().await { sleep(Duration::from_secs(5)).await; @@ -87,63 +40,256 @@ pub async fn init_api_grub_mechanism(config: ApiConfig, rx: &mut Receiver Method { + return match method.trim().to_lowercase().as_str() { + "post" => Method::POST, + "patch" => Method::PATCH, + "put" => Method::PUT, + "delete" => Method::DELETE, + "head" => Method::HEAD, + "trace" => Method::TRACE, + "options" => Method::OPTIONS, + "connect" => Method::CONNECT, + "get" | _ => Method::GET + } } - #[test] - async fn check_api_poll_change_config() { - let mut conf1 = ApiConfig::default(); - let conf2 = ApiConfig { endpoints : vec![], delay : 10, }; - let mut poll = ApiPoll::new(&mut conf1).await; - poll.change_config(conf2).await; - assert_eq!(poll.config.delay, 10) +} +struct ApiPoll<'a> { + config : &'a mut Config, + client : Client, +} + +impl<'a> ApiPoll<'a> { + pub async fn new(poll_cfg : &'a mut Config) -> Self { + Self { + config : poll_cfg, + client : Client::new(), + } } - - #[test] - async fn check_api_poll_is_default() { - let mut conf1 = ApiConfig::default(); - let poll = ApiPoll::new(&mut conf1).await; - assert!(poll.is_default().await) + // can be weak and with bug test needed + pub async fn change_config(&mut self, conf: Config) { + *self.config = conf; } - - #[test] - async fn check_api_grubbing_mechanism_on_public_one() { - use log::{set_max_level, LevelFilter}; - - set_max_level(LevelFilter::Off); - let mut conf1 = ApiConfig { - endpoints : vec![ - ApiEndpoint { - url : String::from("https://dummy-json.mock.beeceptor.com/countries"), - method: String::from("get"), - }], - delay : 10, - }; - let conf2 = ApiConfig::default(); - - let mut poll = ApiPoll::new(&mut conf1).await; - assert!(poll.process_polling().await.is_ok()); - - poll.change_config(conf2).await; - assert!(poll.process_polling().await.is_err()); + pub async fn is_default(&self) -> bool { + self.config.is_default().await } -} \ No newline at end of file + // pub async fn get_delay(&self) -> u32 { + // self.config.timeout + // } + pub async fn process_metrics( + service_id: Arc, + metrics: Arc, + creds: Credentials, + // exporter: Arc + ) -> Result<()> { + // processing metrics + // let mut req = Client::new() + // // .user_agent("api_grub/integration_module") + // .get(&metrics.url); + use std::hash::DefaultHasher; + + let rand = random::(); + let mut hash = DefaultHasher::new(); + rand.hash(&mut hash); + + let client = Client::builder() + .user_agent(format!("api-grabber-{}", hash.finish())); + let mut req = client.build().unwrap().get(&metrics.url); + + let login = &creds.endpoint.login; + let password = &creds.endpoint.password; + let api_key = &creds.endpoint.api_key; + + if !login.is_empty() && !password.is_empty() { + // dbg!("kjgbkasgksjd"); + req = req.basic_auth(login, Some(password)); + } + if !api_key.is_empty() { + // req = req.bearer_auth(&api_key); + // req = req.header("authorization", "bearer "); + + req = req.header("accept", "application/json"); + req = req.header("x-api-key", api_key); + + // req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"]) + } + // dbg!(&req); + // let (client, res) = req.build_split(); + // let res = res.unwrap(); + // res.url_mut().is_special() + + + + // dbg!(client); + // dbg!(res); + // todo!(); + + match req.send().await { + Ok(resp) => { + // dbg!(&resp.text().await); + if let Ok(response) = resp.text().await { + match serde_json::to_value(&response) { + Err(er) => { + error!("Bad JSON in response. Error: {}", er); + }, + Ok(_) => { + let endpoint_name = &metrics.name; + let preproc = JsonParser::parse(&metrics.measure, &response); + // dbg!(serde_json::to_string_pretty(&preproc)); + let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc); + match Exporter::export_metrics(preproc).await { + Ok(bytes) => { + info!("Successfully exported {} bytes of metrics data to Prometheus", bytes); + }, + Err(er) => { + error!("Failed to export data to Prometheus due to {}", er); + }, + } + }, + } + } else { + error!("Bad response from {}. No data", &metrics.url); + } + }, + Err(er) => { + error!("Cannot API data from {} due to : {}", &metrics.url, er); + }, + } + Ok(()) + } + pub async fn process_endpoint( + // client : Arc, + config : Arc, + creds : Credentials, + // exporter : Arc + ) -> Result<()> { + // + let period = config.get_period().unwrap_or(0); + let timeout = config.get_timeout().unwrap_or(5); + let metrics = Arc::new(config.metrics.clone()); + let service_id = Arc::new(config.id.clone()); + loop { + // let exporter = exporter.clone(); + let creds = creds.clone(); + let metrics = metrics.clone(); + let service_id = service_id.clone(); + let mut jh = Vec::>>::new(); + + for idx in 0..metrics.len() { + let creds = creds.clone(); + let metrics = metrics.clone(); + let service_id = service_id.clone(); + let event = tokio::spawn(async move { + Self::process_metrics( + service_id.clone(), + metrics[idx].clone().into(), + creds.clone(), + ).await + }); + jh.push(event); + } + info!("Initializing another {} subjob(s) for `{}` service", + jh.len(), + &service_id + ); + + for i in jh { + let _ = i.await; + } + // processing + sleep(Duration::from_secs(timeout)).await + } + Ok(()) + } + pub async fn process_polling(&self, exporter: Arc) -> Result<()> { + // let buffer: BufferType = Arc::new(Mutex::new(vec![])); + // let mut join_handles: Vec>> = vec![]; + // let client = Arc::new(self.client.clone()); + let config = Arc::new(self.config.clone()); + let endpoints: Vec> = ConfigEndpoint::from_config(config.clone()); + let mut join_handles: Vec>> = vec![]; + + for (idx, _) in config.config.iter().enumerate() { + // let for_creds = endpoints[idx].clone(); + let creds = Credentials::from_config_endpoint(endpoints[idx].clone()); + let endpoint = endpoints[idx].clone(); + // let client = client.clone(); + let exporter = exporter.clone(); + let join_handler = tokio::spawn(async move { + Self::process_endpoint( + // client, + endpoint, + creds, + // exporter.clone() + ).await + }); + join_handles.push(join_handler); + } + info!("Initializing {} task(s) for current config", join_handles.len()); + for i in join_handles { + let _ = i.await; + } + Ok(()) + } +} + +// #[cfg(test)] +// mod net_unittests { +// use super::*; +// use tokio::test; + +// #[test] +// async fn check_str_to_rest_method() { +// assert_eq!(RestMethod::from_str("get").await, Method::GET); +// assert_eq!(RestMethod::from_str("post").await, Method::POST); +// assert_eq!(RestMethod::from_str("patch").await, Method::PATCH); +// assert_eq!(RestMethod::from_str("put").await, Method::PUT); +// assert_eq!(RestMethod::from_str("delete").await, Method::DELETE); +// assert_eq!(RestMethod::from_str("invalid_method").await, Method::GET); +// } +// #[test] +// async fn check_api_poll_change_config() { +// let mut conf1 = ApiConfigV2::default(); +// let conf2 = ApiConfigV2::pattern(); +// let mut poll = ApiPoll::new(&mut conf1).await; +// poll.change_config(conf2).await; +// assert_eq!(poll.config.timeout, 1) +// } + +// #[test] +// async fn check_api_poll_is_default() { +// let mut conf1 = ApiConfigV2::default(); +// let poll = ApiPoll::new(&mut conf1).await; +// assert!(poll.is_default().await) +// } + +// #[test] +// async fn check_api_grubbing_mechanism_on_public_one() { +// use log::{set_max_level, LevelFilter}; + +// set_max_level(LevelFilter::Off); +// let mut conf1 = ApiConfigV2::pattern(); +// let conf2 = ApiConfigV2::default(); +// let exporter = Arc::new(Exporter::init()); + +// let mut poll = ApiPoll::new(&mut conf1).await; +// assert!(poll.process_polling(exporter.clone()).await.is_ok()); + +// dbg!(&poll.config); +// poll.change_config(conf2).await; +// dbg!(&poll.config); +// assert!(poll.process_polling(exporter.clone()).await.is_err()); +// } +// } \ No newline at end of file diff --git a/crates/config-delivery/Cargo.toml b/crates/config-delivery/Cargo.toml deleted file mode 100644 index d6686e7..0000000 --- a/crates/config-delivery/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "config-delivery" -version = "0.3.4" -edition = "2021" - -[dependencies] -dotenv = "0.15.0" -rand = "0.8.5" -serde = { version = "1.0.217", features = ["derive"] } -serde_json = "1.0.135" -tokio = { version = "1.43.0", features = ["full"] } -tokio-websockets = { version = "^0.11.0", features = ["client", "openssl", "rand"] } -integr-structs = {path = "../integr-structs"} -anyhow = "1.0.95" -env_logger = "0.11.6" -log = "0.4.25" -chrono = "0.4.39" diff --git a/crates/config-delivery/src/delivery.rs b/crates/config-delivery/src/delivery.rs deleted file mode 100644 index 38afe4b..0000000 --- a/crates/config-delivery/src/delivery.rs +++ /dev/null @@ -1,43 +0,0 @@ -// mod to communicate with api-grub and preproc services -// using Unix-Socket Client - -use anyhow::{Error, Result}; -use integr_structs::api::ApiConfig; -use tokio::time::{sleep, Instant}; -use tokio::net::UnixStream; -use std::env; - -enum UnixSocketConsumer { - ApiGrubber, - Preproc, -} -// to create us-client -struct UnixSocketClient; - -impl UnixSocketConsumer { - pub async fn get_stream_object(&self) -> Result { - let socket_file = match self { - UnixSocketConsumer::ApiGrubber => env::var("API_GRUBBER_SOCKET")?, - UnixSocketConsumer::Preproc => env::var("PREPROC_SOCKET")?, - }; - UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg("Cannot create Unix-Socket client"))) - } -} - -async fn check_unix_socket_file(path: &str) -> bool { - std::path::Path::new(path).exists() -} - -#[cfg(tests)] -mod delivery_unittests { - use super::*; - use tokio::test; - - // - #[test] - async fn check_api_unix_socket_client_creation() { assert!(true); } - - #[test] - async fn check_preproc_unix_socket_client_creation() { assert!(true); } - // -} \ No newline at end of file diff --git a/crates/config-delivery/src/integration.rs b/crates/config-delivery/src/integration.rs deleted file mode 100644 index e69de29..0000000 diff --git a/crates/config-delivery/src/main.rs b/crates/config-delivery/src/main.rs deleted file mode 100644 index 2be1247..0000000 --- a/crates/config-delivery/src/main.rs +++ /dev/null @@ -1,24 +0,0 @@ -mod delivery; -mod integration; -mod logger; - -use logger::setup_logger; -use dotenv::dotenv; -use anyhow::Result; -use tokio::sync::mpsc; -use log::info; - -// Arch : -// 1) 2 Unix-Socket client (for api grub and preproc) :: i think its a continious process for events when services are unavailable -// 2) mpsc beetween `delivery` and `integration` :: -// 3) websocket client in `integration` to pull configs from Monitoring System :: - -#[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<()> { - let _ = setup_logger().await?; - dotenv().ok(); - info!("Pulling env vars from .env file if exists ..."); - println!("Hello, world!"); - - Ok(()) -} diff --git a/crates/integr-structs/Cargo.toml b/crates/integr-structs/Cargo.toml index fdf9743..d510ba7 100644 --- a/crates/integr-structs/Cargo.toml +++ b/crates/integr-structs/Cargo.toml @@ -4,5 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow = "1.0.95" +chrono = "0.4.40" serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.135" diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index 70f07a4..0c646f8 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -1,4 +1,10 @@ +use std::collections::HashMap; use serde::{Serialize, Deserialize}; +use serde_json::{ to_string_pretty, Value }; +use anyhow::Result; +use std::sync::Arc; +use std::fmt::Display; +use chrono::{DateTime, Local}; #[derive(Serialize, Deserialize, Debug)] @@ -21,4 +27,391 @@ impl Default for ApiConfig { delay : 0, } } +} + +// v2 +#[derive(Serialize, Deserialize, Debug, PartialEq)] +pub struct ApiConfigV2 { + pub id : u64, + #[serde(default)] + pub template : Vec