Compare commits
4 Commits
784a6f7517
...
0fe75be43b
| Author | SHA1 | Date |
|---|---|---|
|
|
0fe75be43b | |
|
|
8630827118 | |
|
|
abfb21f03b | |
|
|
5dae0fe2a5 |
|
|
@ -0,0 +1,54 @@
|
|||
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
|
||||
use tokio_postgres::NoTls;
|
||||
use std::env;
|
||||
use anyhow::{Error, Result};
|
||||
use log::{info, error};
|
||||
|
||||
pub struct Exporter {
|
||||
pool : Option<Pool>,
|
||||
}
|
||||
|
||||
impl Exporter {
|
||||
fn config_construct() -> Result<Config> {
|
||||
let mut cfg = Config::new();
|
||||
cfg.host = Some(env::var("DB_HOST")?);
|
||||
cfg.dbname = Some(env::var("DB_DBNAME")?);
|
||||
cfg.user = Some(env::var("DB_USER")?);
|
||||
cfg.password = Some(env::var("DB_PASSWORD")?);
|
||||
Ok(cfg)
|
||||
}
|
||||
fn pool_construct() -> Option<Pool> {
|
||||
return match Self::config_construct() {
|
||||
Ok(config) => {
|
||||
if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) {
|
||||
info!("Connected to PostgreSQL");
|
||||
return Some(pool);
|
||||
}
|
||||
None
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Bad DB credentials or it's unreachable");
|
||||
None
|
||||
},
|
||||
}
|
||||
}
|
||||
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
||||
pub fn init() -> Self {
|
||||
Self {
|
||||
pool : Self::pool_construct()
|
||||
}
|
||||
}
|
||||
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
||||
if let Some(pool) = &self.pool {
|
||||
return Some(pool.get().await.ok()?);
|
||||
}
|
||||
None
|
||||
}
|
||||
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||
// client.
|
||||
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
||||
let _ = client.query(&query, &[&metrics]).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,153 @@
|
|||
// use serde::{de::value, Serialize};
|
||||
use serde_json::{json, Value};
|
||||
use integr_structs::api::v3::{Metric, MetricOutput};
|
||||
|
||||
pub struct JsonParser;
|
||||
|
||||
impl JsonParser {
|
||||
pub fn parse(targets: &Vec<Metric>, json: &str) -> Value {
|
||||
let mut res_vec: Vec<MetricOutput> = Vec::new();
|
||||
for target in targets {
|
||||
let metric = match target.addr.contains("[") {
|
||||
true => JsonParser::get_sum_of_metrics_in_array(target, json),
|
||||
false => JsonParser::get_metric(target, json),
|
||||
};
|
||||
res_vec.push(MetricOutput::new_with_slices(&target.id, &target.json_type, metric));
|
||||
}
|
||||
serde_json::to_value(res_vec).unwrap_or(Value::Null)
|
||||
}
|
||||
fn get_sum_of_metrics_in_array(target: &Metric, json: &str) -> Value {
|
||||
if target.addr.is_empty() {
|
||||
return Value::Null;
|
||||
}
|
||||
|
||||
let mut vec_value: Vec<Value> = Vec::new();
|
||||
let mut array_key = String::new();
|
||||
let mut value_json: Value = serde_json::from_str(json).unwrap_or(Value::Null);
|
||||
|
||||
let target_attr_vec = target.addr
|
||||
.split_terminator('.')
|
||||
.collect::<Vec<&str>>();
|
||||
// for keys in [] brackets
|
||||
let mut key_tag = String::new();
|
||||
|
||||
for (global_idx, &key) in target_attr_vec.iter().enumerate() {
|
||||
// if array
|
||||
let key_checked = if key.contains('[') {
|
||||
let key_idx = key.find("[").unwrap();
|
||||
key_tag = key.chars()
|
||||
.enumerate()
|
||||
.filter(|(idx, chr)| *idx > key_idx && *chr != ']')
|
||||
.map(|(_, chr)| chr)
|
||||
.collect::<String>();
|
||||
// dbg!(&key_tag);
|
||||
key.chars()
|
||||
.enumerate()
|
||||
.filter(|(idx, _)| *idx < key_idx)
|
||||
.map(|(_, chr)| chr)
|
||||
.collect::<String>()
|
||||
// dbg!(value_json.get(&array_key).unwrap_or(&Value::Null));
|
||||
// value_json = value_json.get(array_key).unwrap_or(&Value::Null).clone();
|
||||
// continue;
|
||||
// new_key.as_str()
|
||||
// dbg!(key); APPROVED
|
||||
// TODO: need to check key in [] type of [KEY]
|
||||
} else {key.to_owned()};
|
||||
|
||||
// if already array
|
||||
match value_json.get(key_checked) {
|
||||
Some(val) => {
|
||||
match val {
|
||||
Value::Array(array) => {
|
||||
// form new target array
|
||||
let new_array_target = target_attr_vec
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(idx, _)| *idx > global_idx)
|
||||
.map(|(_, &chr)| chr.to_owned())
|
||||
.collect::<Vec<String>>();
|
||||
// get_values_in_array
|
||||
// get_tags_in_array
|
||||
// // slice_tags_with_values
|
||||
// dbg!(&array);
|
||||
// dbg!(&new_array_target);
|
||||
let res_arr = Self::get_values_in_array(array, &new_array_target);
|
||||
if &key_tag == "" {
|
||||
return res_arr.into();
|
||||
}
|
||||
return Self::slice_with_tags_in_array(array, &res_arr, &key_tag).into()
|
||||
|
||||
},
|
||||
_ => value_json = val.clone(),
|
||||
}
|
||||
},
|
||||
None => return Value::Null,
|
||||
}
|
||||
}
|
||||
value_json
|
||||
}
|
||||
|
||||
fn slice_with_tags_in_array(array: &Vec<Value>, metrics: &Vec<Value>, tag_name: &str) -> Vec<Value> {
|
||||
if tag_name.is_empty() {
|
||||
return metrics.clone();
|
||||
}
|
||||
// array[0].as_object().unwrap_or(json!(Value::Null))
|
||||
let mut values: Vec<Value> = Vec::new();
|
||||
array.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, val)| {
|
||||
let val = val.get(tag_name).unwrap_or(&Value::Null).clone();
|
||||
(serde_json::from_value::<serde_json::Map<String, Value>>(json!({tag_name: val})),
|
||||
serde_json::from_value::<serde_json::Map<String, Value>>(metrics[idx].clone()))
|
||||
})
|
||||
.for_each(|(key, val)| {
|
||||
// dbg!(&key);
|
||||
if key.is_ok() && val.is_ok() {
|
||||
let mut key = key.unwrap();
|
||||
let mut val = val.unwrap();
|
||||
key.append(&mut val);
|
||||
values.push(json!(key));
|
||||
}
|
||||
});
|
||||
if values.len() == 0 {
|
||||
return metrics.clone();
|
||||
}
|
||||
values
|
||||
}
|
||||
fn get_values_in_array(array: &Vec<Value>, fields: &Vec<String>) -> Vec<Value> {
|
||||
let mut values: Vec<Value> = Vec::new();
|
||||
for obj in array {
|
||||
// dbg!(obj);
|
||||
let mut obrez = obj.clone();
|
||||
for field in fields {
|
||||
obrez = obrez.get(field).unwrap_or(&Value::Null).clone();
|
||||
match obrez {
|
||||
Value::Object(_) => {continue},
|
||||
_ => {
|
||||
values.push(json!({field: obrez.clone()}));
|
||||
},
|
||||
// None => {values.push(Value::Null)},
|
||||
}
|
||||
}
|
||||
}
|
||||
values
|
||||
}
|
||||
|
||||
fn get_metric(target: &Metric, json: &str) -> Value {
|
||||
if target.addr.is_empty() {
|
||||
return Value::Null;
|
||||
}
|
||||
let mut value_json: Value = serde_json::from_str(json).unwrap_or(Value::Null);
|
||||
|
||||
let target_attr_vec = target.addr
|
||||
.split_terminator('.')
|
||||
.collect::<Vec<&str>>();
|
||||
for key in target_attr_vec {
|
||||
match value_json.get(key) {
|
||||
Some(val) => value_json = val.clone(),
|
||||
None => return Value::Null,
|
||||
}
|
||||
}
|
||||
value_json
|
||||
}
|
||||
}
|
||||
|
|
@ -1,14 +1,17 @@
|
|||
mod config;
|
||||
mod net;
|
||||
mod logger;
|
||||
mod json;
|
||||
mod export;
|
||||
|
||||
use anyhow::Result;
|
||||
use integr_structs::api::ApiConfigV2;
|
||||
use logger::setup_logger;
|
||||
use log::{info, warn};
|
||||
// use log::{info, warn};
|
||||
use config::{pull_local_config, init_config_grub_mechanism};
|
||||
use net::init_api_grub_mechanism;
|
||||
use tokio::sync::mpsc;
|
||||
use log::{error, info, warn};
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() -> Result<()>{
|
||||
|
|
@ -22,10 +25,34 @@ async fn main() -> Result<()>{
|
|||
let (tx, mut rx) = mpsc::channel::<ApiConfigV2>(1);
|
||||
// futures
|
||||
// todo : rewrite with spawn
|
||||
let config_fut = init_config_grub_mechanism(&tx);
|
||||
let grub_fut = init_api_grub_mechanism(config, &mut rx);
|
||||
// let config_fut = init_config_grub_mechanism(&tx);
|
||||
// let grub_fut = init_api_grub_mechanism(config, &mut rx);
|
||||
|
||||
let _ = tokio::join!(config_fut, grub_fut);
|
||||
let event_config = tokio::spawn(async move {
|
||||
match init_config_grub_mechanism(&tx).await {
|
||||
Ok(_) => {
|
||||
info!("Config task deinitialized");
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Config task returned an error : {}", er);
|
||||
},
|
||||
}
|
||||
});
|
||||
let event_grub = tokio::spawn(async move {
|
||||
match init_api_grub_mechanism(config, &mut rx).await {
|
||||
Ok(_) => {
|
||||
info!("Grabing task deinitialized");
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Grabing task returned an error : {}", er);
|
||||
},
|
||||
}
|
||||
});
|
||||
let events_handler = vec![event_config, event_grub];
|
||||
for event in events_handler {
|
||||
let _ = event.await;
|
||||
}
|
||||
// let _ = tokio::join!(config_fut, grub_fut);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,62 +8,42 @@ use reqwest::{Client, Method};
|
|||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
// use tokio::sync::Mutex;
|
||||
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
|
||||
use tokio_postgres::NoTls;
|
||||
use dotenv::dotenv;
|
||||
use std::env;
|
||||
use crate::json::JsonParser;
|
||||
use crate::export::Exporter;
|
||||
use integr_structs::api::v3::Config;
|
||||
|
||||
// type BufferType = Arc<Mutex<Vec<String>>>;
|
||||
|
||||
struct Exporter {
|
||||
pool : Option<Pool>,
|
||||
}
|
||||
// for api info pulling
|
||||
pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiConfigV2>) -> Result<()> {
|
||||
info!("Initializing API-info grubbing mechanism...");
|
||||
info!("Loading vars from .env file if exists...");
|
||||
let _ = dotenv().ok();
|
||||
|
||||
impl Exporter {
|
||||
fn config_construct() -> Result<Config> {
|
||||
let mut cfg = Config::new();
|
||||
cfg.host = Some(env::var("DB_HOST")?);
|
||||
cfg.dbname = Some(env::var("DB_DBNAME")?);
|
||||
cfg.user = Some(env::var("DB_USER")?);
|
||||
cfg.password = Some(env::var("DB_PASSWORD")?);
|
||||
Ok(cfg)
|
||||
}
|
||||
fn pool_construct() -> Option<Pool> {
|
||||
return match Self::config_construct() {
|
||||
Ok(config) => {
|
||||
if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) {
|
||||
info!("Connected to PostgreSQL");
|
||||
return Some(pool);
|
||||
let mut config = config;
|
||||
let mut poller = ApiPoll::new(&mut config).await;
|
||||
let client = Exporter::init();
|
||||
let shared_pool = Arc::new(client);
|
||||
loop {
|
||||
let shared_pool = shared_pool.clone();
|
||||
if poller.is_default().await {
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
} else {
|
||||
if rx.len() > 0 {
|
||||
if let Some(conf) = rx.recv().await {
|
||||
poller.change_config(conf).await;
|
||||
info!("Config changed");
|
||||
}
|
||||
None
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Bad DB credentials or it's unreachable");
|
||||
None
|
||||
},
|
||||
}
|
||||
}
|
||||
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
||||
pub fn init() -> Self {
|
||||
Self {
|
||||
pool : Self::pool_construct()
|
||||
}
|
||||
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
|
||||
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
||||
}
|
||||
}
|
||||
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
||||
if let Some(pool) = &self.pool {
|
||||
return Some(pool.get().await.ok()?);
|
||||
}
|
||||
None
|
||||
}
|
||||
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||
// client.
|
||||
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
||||
let _ = client.query(&query, &[&metrics]).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ok(())
|
||||
}
|
||||
|
||||
|
||||
struct RestMethod;
|
||||
|
||||
impl RestMethod {
|
||||
|
|
@ -187,34 +167,6 @@ impl<'a> ApiPoll<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
// for api info pulling
|
||||
pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiConfigV2>) -> Result<()> {
|
||||
info!("Initializing API-info grubbing mechanism...");
|
||||
info!("Loading vars from .env file if exists...");
|
||||
let _ = dotenv().ok();
|
||||
|
||||
let mut config = config;
|
||||
let mut poller = ApiPoll::new(&mut config).await;
|
||||
let client = Exporter::init();
|
||||
let shared_pool = Arc::new(client);
|
||||
loop {
|
||||
let shared_pool = shared_pool.clone();
|
||||
if poller.is_default().await {
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
} else {
|
||||
if rx.len() > 0 {
|
||||
if let Some(conf) = rx.recv().await {
|
||||
poller.change_config(conf).await;
|
||||
info!("Config changed");
|
||||
}
|
||||
}
|
||||
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
|
||||
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
||||
}
|
||||
}
|
||||
// Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod net_unittests {
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -144,4 +144,96 @@ impl ProcessedEndpoint {
|
|||
let val = ProcessedEndpoint::new(&keys.id, &keys.name, &keys.url, &keys.method,hm);
|
||||
Ok(to_string_pretty(&val)?)
|
||||
}
|
||||
}
|
||||
|
||||
pub mod v3 {
|
||||
pub use super::*;
|
||||
|
||||
// in config
|
||||
#[derive(Deserialize)]
|
||||
pub struct Metric {
|
||||
pub id : String,
|
||||
#[serde(rename = "type")]
|
||||
pub json_type : String,
|
||||
pub addr : String,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
pub struct Metrics {
|
||||
pub name : String,
|
||||
pub url : String,
|
||||
#[serde(default)]
|
||||
pub measure : Vec<Metric>
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct ConfigEndpoint {
|
||||
ip : String,
|
||||
login : String,
|
||||
#[serde(rename = "pass")]
|
||||
password : String,
|
||||
api_key : String,
|
||||
period : String,
|
||||
timeout : String,
|
||||
#[serde(default)]
|
||||
metrics : Vec<Metrics>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct Config {
|
||||
config : Vec<ConfigEndpoint>,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
config : Vec::new()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub async fn is_default(&self) -> bool {
|
||||
self.config.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Credentials<'a> {
|
||||
ip : &'a str,
|
||||
login : &'a str,
|
||||
password : &'a str,
|
||||
api_key : &'a str,
|
||||
period : &'a str,
|
||||
timeout : &'a str,
|
||||
}
|
||||
|
||||
impl<'a> Credentials<'a> {
|
||||
pub fn from_config_endpoint(endpoint: &'a ConfigEndpoint) -> Credentials<'a> {
|
||||
Self {
|
||||
ip : &endpoint.ip,
|
||||
login : &endpoint.login,
|
||||
password : &endpoint.password,
|
||||
api_key : &endpoint.api_key,
|
||||
period : &endpoint.period,
|
||||
timeout : &endpoint.timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// to prometheus and nmns
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct MetricOutput {
|
||||
id : String,
|
||||
#[serde(rename = "type")]
|
||||
json_type : String,
|
||||
value : Value,
|
||||
}
|
||||
impl MetricOutput {
|
||||
pub fn new_with_slices(id : &str, json_type : &str, value : Value) -> Self {
|
||||
MetricOutput {
|
||||
id : id.to_string(),
|
||||
json_type : json_type.to_string(),
|
||||
value : value,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,31 +1,32 @@
|
|||
{
|
||||
"id" : 1 ,
|
||||
"template" :
|
||||
[{
|
||||
"id" :"mock_api_1",
|
||||
"name" : "Mock / ",
|
||||
"url" : "http://127.0.0.1:8081/",
|
||||
"method" : "GET",
|
||||
"measure" :
|
||||
[
|
||||
"operation", "response", "empty_field"
|
||||
]
|
||||
},
|
||||
{
|
||||
"config": [
|
||||
{
|
||||
"id" :"mock_api_2",
|
||||
"name" : "Mock /ping ",
|
||||
"url" : "http://127.0.0.1:8081/ping",
|
||||
"method" : "GET",
|
||||
"measure" :
|
||||
[
|
||||
"operation", "response", "empty_field"
|
||||
]
|
||||
}
|
||||
],
|
||||
"ip_address" : "127.0.0.1:8081",
|
||||
"login" : "",
|
||||
"pass" : "" ,
|
||||
"api_key" : "908c709827bd40n98r7209837x98273",
|
||||
"period" : 10,
|
||||
"timeout" : 2
|
||||
}
|
||||
"id":"demo_vcs_vinteo_dev_api",
|
||||
"login" : "",
|
||||
"pass" : "",
|
||||
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
||||
"period" : "",
|
||||
"timeout" : "3",
|
||||
"metrics" : [
|
||||
{
|
||||
"name": "conferences",
|
||||
"url": "https://demo.vcs.vinteo.dev/api/v1/conferences",
|
||||
"measure": [
|
||||
{ "id":"number", "type": "text", "addr": "data.conferences[].number" },
|
||||
{ "id":"total", "type": "integer", "addr": "data.total" },
|
||||
{ "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" },
|
||||
{ "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" },
|
||||
{ "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" }
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "abonents",
|
||||
"url": "https://demo.vcs.vinteo.dev/api/v1/accounts",
|
||||
"measure": [
|
||||
{ "id":"total", "type": "integer", "addr": "data.total" }
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
Loading…
Reference in New Issue