Compare commits

...

4 Commits

Author SHA1 Message Date
prplV 0fe75be43b final conf structs and +creds struct to contain refs on existing values 2025-02-13 12:31:52 +03:00
prplV 8630827118 refactor + async tasks for mods 2025-02-13 12:20:54 +03:00
prplV abfb21f03b - dbg and + serde rename 2025-02-13 10:27:43 +03:00
prplV 5dae0fe2a5 json mod added 2025-02-13 10:23:38 +03:00
6 changed files with 387 additions and 108 deletions

View File

@ -0,0 +1,54 @@
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
use tokio_postgres::NoTls;
use std::env;
use anyhow::{Error, Result};
use log::{info, error};
pub struct Exporter {
pool : Option<Pool>,
}
impl Exporter {
fn config_construct() -> Result<Config> {
let mut cfg = Config::new();
cfg.host = Some(env::var("DB_HOST")?);
cfg.dbname = Some(env::var("DB_DBNAME")?);
cfg.user = Some(env::var("DB_USER")?);
cfg.password = Some(env::var("DB_PASSWORD")?);
Ok(cfg)
}
fn pool_construct() -> Option<Pool> {
return match Self::config_construct() {
Ok(config) => {
if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) {
info!("Connected to PostgreSQL");
return Some(pool);
}
None
},
Err(_) => {
error!("Bad DB credentials or it's unreachable");
None
},
}
}
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
pub fn init() -> Self {
Self {
pool : Self::pool_construct()
}
}
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
if let Some(pool) = &self.pool {
return Some(pool.get().await.ok()?);
}
None
}
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
// client.
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?;
Ok(())
}
}

153
crates/api-grub/src/json.rs Normal file
View File

@ -0,0 +1,153 @@
// use serde::{de::value, Serialize};
use serde_json::{json, Value};
use integr_structs::api::v3::{Metric, MetricOutput};
pub struct JsonParser;
impl JsonParser {
pub fn parse(targets: &Vec<Metric>, json: &str) -> Value {
let mut res_vec: Vec<MetricOutput> = Vec::new();
for target in targets {
let metric = match target.addr.contains("[") {
true => JsonParser::get_sum_of_metrics_in_array(target, json),
false => JsonParser::get_metric(target, json),
};
res_vec.push(MetricOutput::new_with_slices(&target.id, &target.json_type, metric));
}
serde_json::to_value(res_vec).unwrap_or(Value::Null)
}
fn get_sum_of_metrics_in_array(target: &Metric, json: &str) -> Value {
if target.addr.is_empty() {
return Value::Null;
}
let mut vec_value: Vec<Value> = Vec::new();
let mut array_key = String::new();
let mut value_json: Value = serde_json::from_str(json).unwrap_or(Value::Null);
let target_attr_vec = target.addr
.split_terminator('.')
.collect::<Vec<&str>>();
// for keys in [] brackets
let mut key_tag = String::new();
for (global_idx, &key) in target_attr_vec.iter().enumerate() {
// if array
let key_checked = if key.contains('[') {
let key_idx = key.find("[").unwrap();
key_tag = key.chars()
.enumerate()
.filter(|(idx, chr)| *idx > key_idx && *chr != ']')
.map(|(_, chr)| chr)
.collect::<String>();
// dbg!(&key_tag);
key.chars()
.enumerate()
.filter(|(idx, _)| *idx < key_idx)
.map(|(_, chr)| chr)
.collect::<String>()
// dbg!(value_json.get(&array_key).unwrap_or(&Value::Null));
// value_json = value_json.get(array_key).unwrap_or(&Value::Null).clone();
// continue;
// new_key.as_str()
// dbg!(key); APPROVED
// TODO: need to check key in [] type of [KEY]
} else {key.to_owned()};
// if already array
match value_json.get(key_checked) {
Some(val) => {
match val {
Value::Array(array) => {
// form new target array
let new_array_target = target_attr_vec
.iter()
.enumerate()
.filter(|(idx, _)| *idx > global_idx)
.map(|(_, &chr)| chr.to_owned())
.collect::<Vec<String>>();
// get_values_in_array
// get_tags_in_array
// // slice_tags_with_values
// dbg!(&array);
// dbg!(&new_array_target);
let res_arr = Self::get_values_in_array(array, &new_array_target);
if &key_tag == "" {
return res_arr.into();
}
return Self::slice_with_tags_in_array(array, &res_arr, &key_tag).into()
},
_ => value_json = val.clone(),
}
},
None => return Value::Null,
}
}
value_json
}
fn slice_with_tags_in_array(array: &Vec<Value>, metrics: &Vec<Value>, tag_name: &str) -> Vec<Value> {
if tag_name.is_empty() {
return metrics.clone();
}
// array[0].as_object().unwrap_or(json!(Value::Null))
let mut values: Vec<Value> = Vec::new();
array.iter()
.enumerate()
.map(|(idx, val)| {
let val = val.get(tag_name).unwrap_or(&Value::Null).clone();
(serde_json::from_value::<serde_json::Map<String, Value>>(json!({tag_name: val})),
serde_json::from_value::<serde_json::Map<String, Value>>(metrics[idx].clone()))
})
.for_each(|(key, val)| {
// dbg!(&key);
if key.is_ok() && val.is_ok() {
let mut key = key.unwrap();
let mut val = val.unwrap();
key.append(&mut val);
values.push(json!(key));
}
});
if values.len() == 0 {
return metrics.clone();
}
values
}
fn get_values_in_array(array: &Vec<Value>, fields: &Vec<String>) -> Vec<Value> {
let mut values: Vec<Value> = Vec::new();
for obj in array {
// dbg!(obj);
let mut obrez = obj.clone();
for field in fields {
obrez = obrez.get(field).unwrap_or(&Value::Null).clone();
match obrez {
Value::Object(_) => {continue},
_ => {
values.push(json!({field: obrez.clone()}));
},
// None => {values.push(Value::Null)},
}
}
}
values
}
fn get_metric(target: &Metric, json: &str) -> Value {
if target.addr.is_empty() {
return Value::Null;
}
let mut value_json: Value = serde_json::from_str(json).unwrap_or(Value::Null);
let target_attr_vec = target.addr
.split_terminator('.')
.collect::<Vec<&str>>();
for key in target_attr_vec {
match value_json.get(key) {
Some(val) => value_json = val.clone(),
None => return Value::Null,
}
}
value_json
}
}

View File

@ -1,14 +1,17 @@
mod config;
mod net;
mod logger;
mod json;
mod export;
use anyhow::Result;
use integr_structs::api::ApiConfigV2;
use logger::setup_logger;
use log::{info, warn};
// use log::{info, warn};
use config::{pull_local_config, init_config_grub_mechanism};
use net::init_api_grub_mechanism;
use tokio::sync::mpsc;
use log::{error, info, warn};
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()>{
@ -22,10 +25,34 @@ async fn main() -> Result<()>{
let (tx, mut rx) = mpsc::channel::<ApiConfigV2>(1);
// futures
// todo : rewrite with spawn
let config_fut = init_config_grub_mechanism(&tx);
let grub_fut = init_api_grub_mechanism(config, &mut rx);
// let config_fut = init_config_grub_mechanism(&tx);
// let grub_fut = init_api_grub_mechanism(config, &mut rx);
let _ = tokio::join!(config_fut, grub_fut);
let event_config = tokio::spawn(async move {
match init_config_grub_mechanism(&tx).await {
Ok(_) => {
info!("Config task deinitialized");
},
Err(er) => {
error!("Config task returned an error : {}", er);
},
}
});
let event_grub = tokio::spawn(async move {
match init_api_grub_mechanism(config, &mut rx).await {
Ok(_) => {
info!("Grabing task deinitialized");
},
Err(er) => {
error!("Grabing task returned an error : {}", er);
},
}
});
let events_handler = vec![event_config, event_grub];
for event in events_handler {
let _ = event.await;
}
// let _ = tokio::join!(config_fut, grub_fut);
Ok(())
}

View File

@ -8,62 +8,42 @@ use reqwest::{Client, Method};
use std::sync::Arc;
use tokio::task::JoinHandle;
// use tokio::sync::Mutex;
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
use tokio_postgres::NoTls;
use dotenv::dotenv;
use std::env;
use crate::json::JsonParser;
use crate::export::Exporter;
use integr_structs::api::v3::Config;
// type BufferType = Arc<Mutex<Vec<String>>>;
struct Exporter {
pool : Option<Pool>,
}
// for api info pulling
pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiConfigV2>) -> Result<()> {
info!("Initializing API-info grubbing mechanism...");
info!("Loading vars from .env file if exists...");
let _ = dotenv().ok();
impl Exporter {
fn config_construct() -> Result<Config> {
let mut cfg = Config::new();
cfg.host = Some(env::var("DB_HOST")?);
cfg.dbname = Some(env::var("DB_DBNAME")?);
cfg.user = Some(env::var("DB_USER")?);
cfg.password = Some(env::var("DB_PASSWORD")?);
Ok(cfg)
}
fn pool_construct() -> Option<Pool> {
return match Self::config_construct() {
Ok(config) => {
if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) {
info!("Connected to PostgreSQL");
return Some(pool);
let mut config = config;
let mut poller = ApiPoll::new(&mut config).await;
let client = Exporter::init();
let shared_pool = Arc::new(client);
loop {
let shared_pool = shared_pool.clone();
if poller.is_default().await {
sleep(Duration::from_secs(5)).await;
} else {
if rx.len() > 0 {
if let Some(conf) = rx.recv().await {
poller.change_config(conf).await;
info!("Config changed");
}
None
},
Err(_) => {
error!("Bad DB credentials or it's unreachable");
None
},
}
}
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
pub fn init() -> Self {
Self {
pool : Self::pool_construct()
}
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
}
}
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
if let Some(pool) = &self.pool {
return Some(pool.get().await.ok()?);
}
None
}
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
// client.
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?;
Ok(())
}
// Ok(())
}
struct RestMethod;
impl RestMethod {
@ -187,34 +167,6 @@ impl<'a> ApiPoll<'a> {
}
}
// for api info pulling
pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiConfigV2>) -> Result<()> {
info!("Initializing API-info grubbing mechanism...");
info!("Loading vars from .env file if exists...");
let _ = dotenv().ok();
let mut config = config;
let mut poller = ApiPoll::new(&mut config).await;
let client = Exporter::init();
let shared_pool = Arc::new(client);
loop {
let shared_pool = shared_pool.clone();
if poller.is_default().await {
sleep(Duration::from_secs(5)).await;
} else {
if rx.len() > 0 {
if let Some(conf) = rx.recv().await {
poller.change_config(conf).await;
info!("Config changed");
}
}
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
}
}
// Ok(())
}
#[cfg(test)]
mod net_unittests {
use super::*;

View File

@ -144,4 +144,96 @@ impl ProcessedEndpoint {
let val = ProcessedEndpoint::new(&keys.id, &keys.name, &keys.url, &keys.method,hm);
Ok(to_string_pretty(&val)?)
}
}
pub mod v3 {
pub use super::*;
// in config
#[derive(Deserialize)]
pub struct Metric {
pub id : String,
#[serde(rename = "type")]
pub json_type : String,
pub addr : String,
}
#[derive(Deserialize)]
pub struct Metrics {
pub name : String,
pub url : String,
#[serde(default)]
pub measure : Vec<Metric>
}
#[derive(Deserialize)]
pub struct ConfigEndpoint {
ip : String,
login : String,
#[serde(rename = "pass")]
password : String,
api_key : String,
period : String,
timeout : String,
#[serde(default)]
metrics : Vec<Metrics>,
}
#[derive(Deserialize)]
pub struct Config {
config : Vec<ConfigEndpoint>,
}
impl Default for Config {
fn default() -> Self {
Self {
config : Vec::new()
}
}
}
impl Config {
pub async fn is_default(&self) -> bool {
self.config.is_empty()
}
}
pub struct Credentials<'a> {
ip : &'a str,
login : &'a str,
password : &'a str,
api_key : &'a str,
period : &'a str,
timeout : &'a str,
}
impl<'a> Credentials<'a> {
pub fn from_config_endpoint(endpoint: &'a ConfigEndpoint) -> Credentials<'a> {
Self {
ip : &endpoint.ip,
login : &endpoint.login,
password : &endpoint.password,
api_key : &endpoint.api_key,
period : &endpoint.period,
timeout : &endpoint.timeout,
}
}
}
// to prometheus and nmns
#[derive(Serialize, Deserialize)]
pub struct MetricOutput {
id : String,
#[serde(rename = "type")]
json_type : String,
value : Value,
}
impl MetricOutput {
pub fn new_with_slices(id : &str, json_type : &str, value : Value) -> Self {
MetricOutput {
id : id.to_string(),
json_type : json_type.to_string(),
value : value,
}
}
}
}

View File

@ -1,31 +1,32 @@
{
"id" : 1 ,
"template" :
[{
"id" :"mock_api_1",
"name" : "Mock / ",
"url" : "http://127.0.0.1:8081/",
"method" : "GET",
"measure" :
[
"operation", "response", "empty_field"
]
},
{
"config": [
{
"id" :"mock_api_2",
"name" : "Mock /ping ",
"url" : "http://127.0.0.1:8081/ping",
"method" : "GET",
"measure" :
[
"operation", "response", "empty_field"
]
}
],
"ip_address" : "127.0.0.1:8081",
"login" : "",
"pass" : "" ,
"api_key" : "908c709827bd40n98r7209837x98273",
"period" : 10,
"timeout" : 2
}
"id":"demo_vcs_vinteo_dev_api",
"login" : "",
"pass" : "",
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
"period" : "",
"timeout" : "3",
"metrics" : [
{
"name": "conferences",
"url": "https://demo.vcs.vinteo.dev/api/v1/conferences",
"measure": [
{ "id":"number", "type": "text", "addr": "data.conferences[].number" },
{ "id":"total", "type": "integer", "addr": "data.total" },
{ "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" },
{ "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" },
{ "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" }
]
},
{
"name": "abonents",
"url": "https://demo.vcs.vinteo.dev/api/v1/accounts",
"measure": [
{ "id":"total", "type": "integer", "addr": "data.total" }
]
}
]
}
]
}