valid (nearly) db export and so on
parent
72c59dbce9
commit
0bd9b55f87
5
.env
5
.env
|
|
@ -1,3 +1,8 @@
|
||||||
CONFIG_SERVER_CREDS = "ws://127.0.0.1:8080"
|
CONFIG_SERVER_CREDS = "ws://127.0.0.1:8080"
|
||||||
API_GRUBBER_SOCKET = "api-grub.sock"
|
API_GRUBBER_SOCKET = "api-grub.sock"
|
||||||
PREPROC_SOCKET = "preproc.sock"
|
PREPROC_SOCKET = "preproc.sock"
|
||||||
|
|
||||||
|
DB_HOST = "192.168.2.37"
|
||||||
|
DB_USER = "vlad"
|
||||||
|
DB_PASSWORD = "vlad"
|
||||||
|
DB_DBNAME = "vks"
|
||||||
|
|
@ -13,3 +13,6 @@ log = "0.4.25"
|
||||||
anyhow = "1.0.95"
|
anyhow = "1.0.95"
|
||||||
chrono = "0.4.39"
|
chrono = "0.4.39"
|
||||||
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
|
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
|
||||||
|
deadpool-postgres = "0.14.1"
|
||||||
|
tokio-postgres = "0.7.12"
|
||||||
|
dotenv = "0.15.0"
|
||||||
|
|
|
||||||
|
|
@ -2,14 +2,70 @@
|
||||||
use anyhow::{Error, Result};
|
use anyhow::{Error, Result};
|
||||||
use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
|
use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
|
use serde_json::Value;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use reqwest::{Client, Method};
|
use reqwest::{Client, Method};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use tokio::sync::Mutex;
|
// use tokio::sync::Mutex;
|
||||||
|
use tokio_postgres::types::ToSql;
|
||||||
|
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
|
||||||
|
use tokio_postgres::NoTls;
|
||||||
|
use dotenv::dotenv;
|
||||||
|
use std::env;
|
||||||
|
|
||||||
|
// type BufferType = Arc<Mutex<Vec<String>>>;
|
||||||
|
|
||||||
|
struct Exporter {
|
||||||
|
pool : Option<Pool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Exporter {
|
||||||
|
fn config_construct() -> Result<Config> {
|
||||||
|
let mut cfg = Config::new();
|
||||||
|
cfg.host = Some(env::var("DB_HOST")?);
|
||||||
|
cfg.dbname = Some(env::var("DB_DBNAME")?);
|
||||||
|
cfg.user = Some(env::var("DB_USER")?);
|
||||||
|
cfg.password = Some(env::var("DB_PASSWORD")?);
|
||||||
|
Ok(cfg)
|
||||||
|
}
|
||||||
|
fn pool_construct() -> Option<Pool> {
|
||||||
|
return match Self::config_construct() {
|
||||||
|
Ok(config) => {
|
||||||
|
if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) {
|
||||||
|
info!("Connected to PostgreSQL");
|
||||||
|
return Some(pool);
|
||||||
|
}
|
||||||
|
None
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Bad DB credentials or it's unreachable");
|
||||||
|
None
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
||||||
|
pub fn init() -> Self {
|
||||||
|
Self {
|
||||||
|
pool : Self::pool_construct()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
||||||
|
if let Some(pool) = &self.pool {
|
||||||
|
return Some(pool.get().await.ok()?);
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||||
|
// client.
|
||||||
|
let query = "INSERT INTO metrics (body) VALUES ($1);";
|
||||||
|
let _ = client.execute(query, &[&query]).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
type BufferType = Arc<Mutex<Vec<String>>>;
|
|
||||||
struct RestMethod;
|
struct RestMethod;
|
||||||
|
|
||||||
impl RestMethod {
|
impl RestMethod {
|
||||||
|
|
@ -46,34 +102,43 @@ impl<'a> ApiPoll<'a> {
|
||||||
pub async fn is_default(&self) -> bool {
|
pub async fn is_default(&self) -> bool {
|
||||||
self.config.template.len() == 0
|
self.config.template.len() == 0
|
||||||
}
|
}
|
||||||
pub async fn process_polling(&self) -> Result<()> {
|
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
|
||||||
let buffer: BufferType = Arc::new(Mutex::new(vec![]));
|
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
|
||||||
let mut join_handles: Vec<JoinHandle<()>> = vec![];
|
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||||
let client = Arc::new(self.client.clone());
|
let client = Arc::new(self.client.clone());
|
||||||
let template = Arc::new(self.config.template.clone());
|
let template = Arc::new(self.config.template.clone());
|
||||||
|
|
||||||
// TODO: rewrite nextly to async
|
// TODO: rewrite nextly to async
|
||||||
for point in template.iter() {
|
for point in template.iter() {
|
||||||
let point = Arc::new(point.clone());
|
let point = Arc::new(point.clone());
|
||||||
let buffer = buffer.clone();
|
// let buffer = buffer.clone();
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
// let a = self.client.get(&point.url).send().await.unwrap();
|
let exporter = exporter.clone();
|
||||||
// a.text().await.unwrap();
|
|
||||||
let endpoint_processer = tokio::spawn(async move {
|
let endpoint_processer = tokio::spawn(async move {
|
||||||
let point = point.clone();
|
let point = point.clone();
|
||||||
match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
|
match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
if !resp.status().is_success() {
|
if !resp.status().is_success() {
|
||||||
error!("ErrorCode in Response from API. Check configuration");
|
error!("ErrorCode in Response from API. Check configuration");
|
||||||
return;
|
return Err(Error::msg("Error during sending request"));
|
||||||
}
|
}
|
||||||
if let Ok(text) = resp.text().await {
|
if let Ok(text) = resp.text().await {
|
||||||
//
|
//
|
||||||
let a = ProcessedEndpoint::from_target_response(&text, &point);
|
let metrics = ProcessedEndpoint::from_target_response(&text, &point)?;
|
||||||
println!("{}", a.unwrap());
|
|
||||||
//
|
//
|
||||||
let mut buffer = buffer.lock().await;
|
if let Some(conn) = exporter.get_connection_from_pool().await {
|
||||||
buffer.push(text);
|
if let Err(er) = Exporter::export_data(conn, &metrics).await {
|
||||||
|
error!("Cannot export data to DB during to: {}", er);
|
||||||
|
return Err(Error::msg("Error during exporting data to DB"));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if !exporter.is_no_connection() {
|
||||||
|
return Err(Error::msg("Error during getting connection from pool"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// let mut buffer = buffer.lock().await;
|
||||||
|
// buffer.push(text);
|
||||||
} else {
|
} else {
|
||||||
error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url);
|
error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url);
|
||||||
}
|
}
|
||||||
|
|
@ -82,6 +147,7 @@ impl<'a> ApiPoll<'a> {
|
||||||
error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url);
|
error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
});
|
});
|
||||||
join_handles.push(endpoint_processer);
|
join_handles.push(endpoint_processer);
|
||||||
}
|
}
|
||||||
|
|
@ -90,14 +156,14 @@ impl<'a> ApiPoll<'a> {
|
||||||
let _ = i.await;
|
let _ = i.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let buffer = buffer.lock().await;
|
// let buffer = buffer.lock().await;
|
||||||
match &buffer.len() {
|
// match &buffer.len() {
|
||||||
0 => Err(Error::msg("Error due to API grubbing. Check config" )),
|
// 0 => Err(Error::msg("Error due to API grubbing. Check config" )),
|
||||||
_ => {
|
// _ => {
|
||||||
Ok(())
|
// Ok(())
|
||||||
},
|
// },
|
||||||
}
|
// }
|
||||||
// Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn get_delay(&self) -> u32 {
|
pub async fn get_delay(&self) -> u32 {
|
||||||
self.config.timeout
|
self.config.timeout
|
||||||
|
|
@ -107,10 +173,15 @@ impl<'a> ApiPoll<'a> {
|
||||||
// for api info pulling
|
// for api info pulling
|
||||||
pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiConfigV2>) -> Result<()> {
|
pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiConfigV2>) -> Result<()> {
|
||||||
info!("Initializing API-info grubbing mechanism...");
|
info!("Initializing API-info grubbing mechanism...");
|
||||||
|
info!("Loading vars from .env file if exists...");
|
||||||
|
let _ = dotenv().ok();
|
||||||
|
|
||||||
let mut config = config;
|
let mut config = config;
|
||||||
let mut poller = ApiPoll::new(&mut config).await;
|
let mut poller = ApiPoll::new(&mut config).await;
|
||||||
// let arc_poller = Arc::new(poller);
|
let client = Exporter::init();
|
||||||
|
let shared_pool = Arc::new(client);
|
||||||
loop {
|
loop {
|
||||||
|
let shared_pool = shared_pool.clone();
|
||||||
if poller.is_default().await {
|
if poller.is_default().await {
|
||||||
sleep(Duration::from_secs(5)).await;
|
sleep(Duration::from_secs(5)).await;
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -120,7 +191,7 @@ pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiC
|
||||||
info!("Config changed");
|
info!("Config changed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("Data from API: {:?}", poller.process_polling().await);
|
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
|
||||||
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -164,11 +235,12 @@ mod net_unittests {
|
||||||
set_max_level(LevelFilter::Off);
|
set_max_level(LevelFilter::Off);
|
||||||
let mut conf1 = ApiConfigV2::pattern();
|
let mut conf1 = ApiConfigV2::pattern();
|
||||||
let conf2 = ApiConfigV2::default();
|
let conf2 = ApiConfigV2::default();
|
||||||
|
let exporter = Arc::new(Exporter::init());
|
||||||
|
|
||||||
let mut poll = ApiPoll::new(&mut conf1).await;
|
let mut poll = ApiPoll::new(&mut conf1).await;
|
||||||
assert!(poll.process_polling().await.is_ok());
|
assert!(poll.process_polling(exporter.clone()).await.is_ok());
|
||||||
|
|
||||||
poll.change_config(conf2).await;
|
poll.change_config(conf2).await;
|
||||||
assert!(poll.process_polling().await.is_err());
|
assert!(poll.process_polling(exporter.clone()).await.is_err());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue