refactor + async tasks for mods

pull/6/head
prplV 2025-02-13 12:20:54 +03:00
parent abfb21f03b
commit 8630827118
6 changed files with 178 additions and 142 deletions

View File

@ -0,0 +1,54 @@
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
use tokio_postgres::NoTls;
use std::env;
use anyhow::{Error, Result};
use log::{info, error};
pub struct Exporter {
pool : Option<Pool>,
}
impl Exporter {
fn config_construct() -> Result<Config> {
let mut cfg = Config::new();
cfg.host = Some(env::var("DB_HOST")?);
cfg.dbname = Some(env::var("DB_DBNAME")?);
cfg.user = Some(env::var("DB_USER")?);
cfg.password = Some(env::var("DB_PASSWORD")?);
Ok(cfg)
}
fn pool_construct() -> Option<Pool> {
return match Self::config_construct() {
Ok(config) => {
if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) {
info!("Connected to PostgreSQL");
return Some(pool);
}
None
},
Err(_) => {
error!("Bad DB credentials or it's unreachable");
None
},
}
}
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
pub fn init() -> Self {
Self {
pool : Self::pool_construct()
}
}
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
if let Some(pool) = &self.pool {
return Some(pool.get().await.ok()?);
}
None
}
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
// client.
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?;
Ok(())
}
}

View File

@ -1,22 +1,22 @@
// use serde::{de::value, Serialize}; // use serde::{de::value, Serialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
use integr_structs::api::v3::{TestMetric, TestMetricOutput}; use integr_structs::api::v3::{Metric, MetricOutput};
pub struct JsonParser; pub struct JsonParser;
impl JsonParser { impl JsonParser {
pub fn parse(targets: &Vec<TestMetric>, json: &str) -> Value { pub fn parse(targets: &Vec<Metric>, json: &str) -> Value {
let mut res_vec: Vec<TestMetricOutput> = Vec::new(); let mut res_vec: Vec<MetricOutput> = Vec::new();
for target in targets { for target in targets {
let metric = match target.addr.contains("[") { let metric = match target.addr.contains("[") {
true => JsonParser::get_sum_of_metrics_in_array(target, json), true => JsonParser::get_sum_of_metrics_in_array(target, json),
false => JsonParser::get_metric(target, json), false => JsonParser::get_metric(target, json),
}; };
res_vec.push(TestMetricOutput::new_with_slices(&target.id, &target.json_type, metric)); res_vec.push(MetricOutput::new_with_slices(&target.id, &target.json_type, metric));
} }
serde_json::to_value(res_vec).unwrap_or(Value::Null) serde_json::to_value(res_vec).unwrap_or(Value::Null)
} }
fn get_sum_of_metrics_in_array(target: &TestMetric, json: &str) -> Value { fn get_sum_of_metrics_in_array(target: &Metric, json: &str) -> Value {
if target.addr.is_empty() { if target.addr.is_empty() {
return Value::Null; return Value::Null;
} }
@ -133,7 +133,7 @@ impl JsonParser {
values values
} }
fn get_metric(target: &TestMetric, json: &str) -> Value { fn get_metric(target: &Metric, json: &str) -> Value {
if target.addr.is_empty() { if target.addr.is_empty() {
return Value::Null; return Value::Null;
} }
@ -151,26 +151,3 @@ impl JsonParser {
value_json value_json
} }
} }
// pub struct TestMetric {
// pub id : String,
// pub json_type : String,
// pub addr : String,
// }
// #[derive(Serialize)]
// struct TestMetricOutput {
// id : String,
// json_type : String,
// value : Value,
// }
// impl TestMetricOutput {
// fn new_with_slices(id : &str, json_type : &str, value : Value) -> Self {
// TestMetricOutput {
// id : id.to_string(),
// json_type : json_type.to_string(),
// value : value,
// }
// }
// }

View File

@ -2,14 +2,16 @@ mod config;
mod net; mod net;
mod logger; mod logger;
mod json; mod json;
mod export;
use anyhow::Result; use anyhow::Result;
use integr_structs::api::ApiConfigV2; use integr_structs::api::ApiConfigV2;
use logger::setup_logger; use logger::setup_logger;
use log::{info, warn}; // use log::{info, warn};
use config::{pull_local_config, init_config_grub_mechanism}; use config::{pull_local_config, init_config_grub_mechanism};
use net::init_api_grub_mechanism; use net::init_api_grub_mechanism;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use log::{error, info, warn};
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()>{ async fn main() -> Result<()>{
@ -23,10 +25,34 @@ async fn main() -> Result<()>{
let (tx, mut rx) = mpsc::channel::<ApiConfigV2>(1); let (tx, mut rx) = mpsc::channel::<ApiConfigV2>(1);
// futures // futures
// todo : rewrite with spawn // todo : rewrite with spawn
let config_fut = init_config_grub_mechanism(&tx); // let config_fut = init_config_grub_mechanism(&tx);
let grub_fut = init_api_grub_mechanism(config, &mut rx); // let grub_fut = init_api_grub_mechanism(config, &mut rx);
let _ = tokio::join!(config_fut, grub_fut); let event_config = tokio::spawn(async move {
match init_config_grub_mechanism(&tx).await {
Ok(_) => {
info!("Config task deinitialized");
},
Err(er) => {
error!("Config task returned an error : {}", er);
},
}
});
let event_grub = tokio::spawn(async move {
match init_api_grub_mechanism(config, &mut rx).await {
Ok(_) => {
info!("Grabing task deinitialized");
},
Err(er) => {
error!("Grabing task returned an error : {}", er);
},
}
});
let events_handler = vec![event_config, event_grub];
for event in events_handler {
let _ = event.await;
}
// let _ = tokio::join!(config_fut, grub_fut);
Ok(()) Ok(())
} }

View File

@ -8,61 +8,41 @@ use reqwest::{Client, Method};
use std::sync::Arc; use std::sync::Arc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
// use tokio::sync::Mutex; // use tokio::sync::Mutex;
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
use tokio_postgres::NoTls;
use dotenv::dotenv; use dotenv::dotenv;
use std::env; use crate::json::JsonParser;
use crate::export::Exporter;
use integr_structs::api::v3::Config;
// type BufferType = Arc<Mutex<Vec<String>>>; // type BufferType = Arc<Mutex<Vec<String>>>;
struct Exporter { // for api info pulling
pool : Option<Pool>, pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiConfigV2>) -> Result<()> {
info!("Initializing API-info grubbing mechanism...");
info!("Loading vars from .env file if exists...");
let _ = dotenv().ok();
let mut config = config;
let mut poller = ApiPoll::new(&mut config).await;
let client = Exporter::init();
let shared_pool = Arc::new(client);
loop {
let shared_pool = shared_pool.clone();
if poller.is_default().await {
sleep(Duration::from_secs(5)).await;
} else {
if rx.len() > 0 {
if let Some(conf) = rx.recv().await {
poller.change_config(conf).await;
info!("Config changed");
}
}
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
}
}
// Ok(())
} }
impl Exporter {
fn config_construct() -> Result<Config> {
let mut cfg = Config::new();
cfg.host = Some(env::var("DB_HOST")?);
cfg.dbname = Some(env::var("DB_DBNAME")?);
cfg.user = Some(env::var("DB_USER")?);
cfg.password = Some(env::var("DB_PASSWORD")?);
Ok(cfg)
}
fn pool_construct() -> Option<Pool> {
return match Self::config_construct() {
Ok(config) => {
if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) {
info!("Connected to PostgreSQL");
return Some(pool);
}
None
},
Err(_) => {
error!("Bad DB credentials or it's unreachable");
None
},
}
}
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
pub fn init() -> Self {
Self {
pool : Self::pool_construct()
}
}
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
if let Some(pool) = &self.pool {
return Some(pool.get().await.ok()?);
}
None
}
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
// client.
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?;
Ok(())
}
}
struct RestMethod; struct RestMethod;
@ -187,34 +167,6 @@ impl<'a> ApiPoll<'a> {
} }
} }
// for api info pulling
pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiConfigV2>) -> Result<()> {
info!("Initializing API-info grubbing mechanism...");
info!("Loading vars from .env file if exists...");
let _ = dotenv().ok();
let mut config = config;
let mut poller = ApiPoll::new(&mut config).await;
let client = Exporter::init();
let shared_pool = Arc::new(client);
loop {
let shared_pool = shared_pool.clone();
if poller.is_default().await {
sleep(Duration::from_secs(5)).await;
} else {
if rx.len() > 0 {
if let Some(conf) = rx.recv().await {
poller.change_config(conf).await;
info!("Config changed");
}
}
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
}
}
// Ok(())
}
#[cfg(test)] #[cfg(test)]
mod net_unittests { mod net_unittests {
use super::*; use super::*;

View File

@ -151,23 +151,49 @@ pub mod v3 {
// in config // in config
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct TestMetric { pub struct Metric {
pub id : String, pub id : String,
#[serde(rename = "type")] #[serde(rename = "type")]
pub json_type : String, pub json_type : String,
pub addr : String, pub addr : String,
} }
#[derive(Deserialize)]
pub struct Metrics {
pub name : String,
pub url : String,
#[serde(default)]
pub measure : Vec<Metric>
}
#[derive(Deserialize)]
pub struct ConfigEndpoint {
ip : String,
login : String,
#[serde(rename = "pass")]
password : String,
api_key : String,
period : String,
timeout : String,
#[serde(default)]
metrics : Vec<Metrics>,
}
#[derive(Deserialize)]
pub struct Config {
config : Vec<ConfigEndpoint>,
}
// to prometheus and nmns
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct TestMetricOutput { pub struct MetricOutput {
id : String, id : String,
#[serde(rename = "type")] #[serde(rename = "type")]
json_type : String, json_type : String,
value : Value, value : Value,
} }
impl TestMetricOutput { impl MetricOutput {
pub fn new_with_slices(id : &str, json_type : &str, value : Value) -> Self { pub fn new_with_slices(id : &str, json_type : &str, value : Value) -> Self {
TestMetricOutput { MetricOutput {
id : id.to_string(), id : id.to_string(),
json_type : json_type.to_string(), json_type : json_type.to_string(),
value : value, value : value,

View File

@ -1,31 +1,32 @@
{ {
"id" : 1 , "config": [
"template" : {
[{ "id":"demo_vcs_vinteo_dev_api",
"id" :"mock_api_1", "login" : "",
"name" : "Mock / ", "pass" : "",
"url" : "http://127.0.0.1:8081/", "api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
"method" : "GET", "period" : "",
"measure" : "timeout" : "3",
[ "metrics" : [
"operation", "response", "empty_field" {
"name": "conferences",
"url": "https://demo.vcs.vinteo.dev/api/v1/conferences",
"measure": [
{ "id":"number", "type": "text", "addr": "data.conferences[].number" },
{ "id":"total", "type": "integer", "addr": "data.total" },
{ "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" },
{ "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" },
{ "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" }
] ]
}, },
{ {
"id" :"mock_api_2", "name": "abonents",
"name" : "Mock /ping ", "url": "https://demo.vcs.vinteo.dev/api/v1/accounts",
"url" : "http://127.0.0.1:8081/ping", "measure": [
"method" : "GET", { "id":"total", "type": "integer", "addr": "data.total" }
"measure" :
[
"operation", "response", "empty_field"
] ]
} }
], ]
"ip_address" : "127.0.0.1:8081", }
"login" : "", ]
"pass" : "" ,
"api_key" : "908c709827bd40n98r7209837x98273",
"period" : 10,
"timeout" : 2
} }