Compare commits
14 Commits
d94b0f809b
...
208466efb4
| Author | SHA1 | Date |
|---|---|---|
|
|
208466efb4 | |
|
|
0bd9b55f87 | |
|
|
72c59dbce9 | |
|
|
70b7d41f87 | |
|
|
498bedf1f4 | |
|
|
59ab93a46e | |
|
|
efb00dcf5e | |
|
|
dd2973924a | |
|
|
d3fcfae15b | |
|
|
ab75252e6b | |
|
|
1f5656f7b8 | |
|
|
a225ec690e | |
|
|
1e65db7ca5 | |
|
|
c607bdc0d6 |
5
.env
5
.env
|
|
@ -1,3 +1,8 @@
|
||||||
CONFIG_SERVER_CREDS = "ws://127.0.0.1:8080"
|
CONFIG_SERVER_CREDS = "ws://127.0.0.1:8080"
|
||||||
API_GRUBBER_SOCKET = "api-grub.sock"
|
API_GRUBBER_SOCKET = "api-grub.sock"
|
||||||
PREPROC_SOCKET = "preproc.sock"
|
PREPROC_SOCKET = "preproc.sock"
|
||||||
|
|
||||||
|
DB_HOST = "192.168.2.37"
|
||||||
|
DB_USER = "vlad"
|
||||||
|
DB_PASSWORD = "vlad"
|
||||||
|
DB_DBNAME = "vks"
|
||||||
|
|
@ -10,6 +10,6 @@
|
||||||
| Crate (submodule) | Progress |
|
| Crate (submodule) | Progress |
|
||||||
|---|---|
|
|---|---|
|
||||||
|`api-grub` | ✅✅✅✅✅✅✅🔲🔲🔲 |
|
|`api-grub` | ✅✅✅✅✅✅✅🔲🔲🔲 |
|
||||||
|`config-delivery` | ✅✅✅✅🔲🔲🔲🔲🔲🔲 |
|
|`config-delivery` | ✅✅✅✅✅✅🔲🔲🔲🔲 |
|
||||||
|`integrs-structs` | ✅✅✅✅✅✅🔲🔲🔲🔲 |
|
|`integrs-structs` | ✅✅✅✅✅✅🔲🔲🔲🔲 |
|
||||||
|`preproc` | 🔲🔲🔲🔲🔲🔲🔲🔲🔲🔲 |
|
|`preproc` | ✅✅✅🔲🔲🔲🔲🔲🔲🔲 |
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,31 @@
|
||||||
{
|
{
|
||||||
"endpoints" : [
|
"id" : 1 ,
|
||||||
{
|
"template" :
|
||||||
"url" : "http://127.0.0.1:8081/ping",
|
[{
|
||||||
"method" : "GET"
|
"id" :"mock_api_1",
|
||||||
},
|
"name" : "Mock / ",
|
||||||
{
|
"url" : "http://127.0.0.1:8081/",
|
||||||
"url" : "http://127.0.0.1:8081/",
|
"method" : "GET",
|
||||||
"method" : "GET"
|
"measure" :
|
||||||
}
|
[
|
||||||
],
|
"operation", "response"
|
||||||
"delay" : 5
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id" :"mock_api_2",
|
||||||
|
"name" : "Mock /ping ",
|
||||||
|
"url" : "http://127.0.0.1:8081/ping",
|
||||||
|
"method" : "GET",
|
||||||
|
"measure" :
|
||||||
|
[
|
||||||
|
"operation", "response", "empty_field"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"ip_address" : "127.0.0.1:8081",
|
||||||
|
"login" : "",
|
||||||
|
"pass" : "" ,
|
||||||
|
"api_key" : "908c709827bd40n98r7209837x98273",
|
||||||
|
"period" : 10,
|
||||||
|
"timeout" : 10
|
||||||
}
|
}
|
||||||
|
|
@ -13,3 +13,6 @@ log = "0.4.25"
|
||||||
anyhow = "1.0.95"
|
anyhow = "1.0.95"
|
||||||
chrono = "0.4.39"
|
chrono = "0.4.39"
|
||||||
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
|
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
|
||||||
|
deadpool-postgres = "0.14.1"
|
||||||
|
tokio-postgres = "0.7.12"
|
||||||
|
dotenv = "0.15.0"
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
// mod to communicate with api-grub config file
|
// mod to communicate with api-grub config file
|
||||||
// 1) check changes in unix-socket
|
// 1) check changes in unix-socket
|
||||||
// 2) save changes in local config file
|
// 2) save changes in local config file
|
||||||
use integr_structs::api::ApiConfig;
|
use integr_structs::api::{ApiConfig, ApiConfigV2};
|
||||||
use anyhow::{Error, Ok, Result};
|
use anyhow::{Error, Ok, Result};
|
||||||
use log::{info, warn, error};
|
use log::{info, warn, error};
|
||||||
use std::{fs, path::Path};
|
use std::{fs, path::Path};
|
||||||
|
|
@ -15,7 +15,7 @@ const CONFIG_PATH: &str = "config_api.json";
|
||||||
const SOCKET_PATH: &str = "api-grub.sock";
|
const SOCKET_PATH: &str = "api-grub.sock";
|
||||||
|
|
||||||
// todo! rewrite to use current_exe
|
// todo! rewrite to use current_exe
|
||||||
pub async fn pull_local_config() -> Result<ApiConfig> {
|
pub async fn pull_local_config() -> Result<ApiConfigV2> {
|
||||||
// let conf_path = std::env::current_exe()?;
|
// let conf_path = std::env::current_exe()?;
|
||||||
let path = Path::new(CONFIG_PATH);
|
let path = Path::new(CONFIG_PATH);
|
||||||
// return match conf_path.parent() {
|
// return match conf_path.parent() {
|
||||||
|
|
@ -28,7 +28,7 @@ pub async fn pull_local_config() -> Result<ApiConfig> {
|
||||||
// None => Err(Error::msg("No local conf was found"))
|
// None => Err(Error::msg("No local conf was found"))
|
||||||
// }
|
// }
|
||||||
if path.exists() && path.is_file() {
|
if path.exists() && path.is_file() {
|
||||||
let config: ApiConfig = from_str(
|
let config: ApiConfigV2 = from_str(
|
||||||
&fs::read_to_string(CONFIG_PATH)?
|
&fs::read_to_string(CONFIG_PATH)?
|
||||||
)?;
|
)?;
|
||||||
Ok(config)
|
Ok(config)
|
||||||
|
|
@ -39,7 +39,7 @@ pub async fn pull_local_config() -> Result<ApiConfig> {
|
||||||
|
|
||||||
// for config pulling
|
// for config pulling
|
||||||
// ++++ reader to channel
|
// ++++ reader to channel
|
||||||
pub async fn init_config_grub_mechanism(tx: &Sender<ApiConfig>) -> Result<()> {
|
pub async fn init_config_grub_mechanism(tx: &Sender<ApiConfigV2>) -> Result<()> {
|
||||||
info!("Initializing Unix-Socket listening for pulling new configs...");
|
info!("Initializing Unix-Socket listening for pulling new configs...");
|
||||||
let server = init_unix_listener().await?;
|
let server = init_unix_listener().await?;
|
||||||
//
|
//
|
||||||
|
|
@ -51,7 +51,7 @@ pub async fn init_config_grub_mechanism(tx: &Sender<ApiConfig>) -> Result<()> {
|
||||||
if let Err(er) = stream.read_to_string(&mut buffer).await {
|
if let Err(er) = stream.read_to_string(&mut buffer).await {
|
||||||
warn!("Cannot read config from stream due to {}", er);
|
warn!("Cannot read config from stream due to {}", er);
|
||||||
} else {
|
} else {
|
||||||
let config: Result<ApiConfig, serde_json::Error> = from_str(&buffer);
|
let config: Result<ApiConfigV2, serde_json::Error> = from_str(&buffer);
|
||||||
if let stdOk(conf) = config {
|
if let stdOk(conf) = config {
|
||||||
info!("New config was pulled from Unix-Stream. Saving it locally and sharing with API-grub module...");
|
info!("New config was pulled from Unix-Stream. Saving it locally and sharing with API-grub module...");
|
||||||
if let Err(er) = save_new_config(&buffer).await {
|
if let Err(er) = save_new_config(&buffer).await {
|
||||||
|
|
@ -97,13 +97,13 @@ mod config_unittests {
|
||||||
#[test]
|
#[test]
|
||||||
async fn check_save_new_config() {
|
async fn check_save_new_config() {
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use integr_structs::api::ApiConfig;
|
use integr_structs::api::ApiConfigV2;
|
||||||
use serde_json::to_string;
|
use serde_json::to_string;
|
||||||
|
|
||||||
let test_config_path = "test_config_api.json";
|
let test_config_path = "test_config_api.json";
|
||||||
|
|
||||||
// config gen
|
// config gen
|
||||||
let config = to_string::<ApiConfig>(&ApiConfig::default());
|
let config = to_string::<ApiConfigV2>(&ApiConfigV2::default());
|
||||||
assert!(config.is_ok());
|
assert!(config.is_ok());
|
||||||
let config = config.unwrap();
|
let config = config.unwrap();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ mod net;
|
||||||
mod logger;
|
mod logger;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use integr_structs::api::ApiConfig;
|
use integr_structs::api::ApiConfigV2;
|
||||||
use logger::setup_logger;
|
use logger::setup_logger;
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use config::{pull_local_config, init_config_grub_mechanism};
|
use config::{pull_local_config, init_config_grub_mechanism};
|
||||||
|
|
@ -19,8 +19,9 @@ async fn main() -> Result<()>{
|
||||||
setup_logger().await?;
|
setup_logger().await?;
|
||||||
let config = get_config().await;
|
let config = get_config().await;
|
||||||
// config update channel
|
// config update channel
|
||||||
let (tx, mut rx) = mpsc::channel::<ApiConfig>(1);
|
let (tx, mut rx) = mpsc::channel::<ApiConfigV2>(1);
|
||||||
// futures
|
// futures
|
||||||
|
// todo : rewrite with spawn
|
||||||
let config_fut = init_config_grub_mechanism(&tx);
|
let config_fut = init_config_grub_mechanism(&tx);
|
||||||
let grub_fut = init_api_grub_mechanism(config, &mut rx);
|
let grub_fut = init_api_grub_mechanism(config, &mut rx);
|
||||||
|
|
||||||
|
|
@ -29,7 +30,7 @@ async fn main() -> Result<()>{
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_config() -> ApiConfig {
|
async fn get_config() -> ApiConfigV2 {
|
||||||
return match pull_local_config().await {
|
return match pull_local_config().await {
|
||||||
Ok(conf) => {
|
Ok(conf) => {
|
||||||
info!("Local config was loaded");
|
info!("Local config was loaded");
|
||||||
|
|
@ -37,7 +38,7 @@ async fn get_config() -> ApiConfig {
|
||||||
},
|
},
|
||||||
Err(er) => {
|
Err(er) => {
|
||||||
warn!("Cannot get local config due to {}", er);
|
warn!("Cannot get local config due to {}", er);
|
||||||
ApiConfig::default()
|
ApiConfigV2::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1,10 +1,70 @@
|
||||||
// module to handle unix-socket connection + pulling info from api
|
// module to handle unix-socket connection + pulling info from api
|
||||||
use anyhow::{Error, Result};
|
use anyhow::{Error, Result};
|
||||||
use integr_structs::api::ApiConfig;
|
use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
|
use serde_json::Value;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use reqwest::{Client, Method};
|
use reqwest::{Client, Method};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
// use tokio::sync::Mutex;
|
||||||
|
use tokio_postgres::types::ToSql;
|
||||||
|
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
|
||||||
|
use tokio_postgres::NoTls;
|
||||||
|
use dotenv::dotenv;
|
||||||
|
use std::env;
|
||||||
|
|
||||||
|
// type BufferType = Arc<Mutex<Vec<String>>>;
|
||||||
|
|
||||||
|
struct Exporter {
|
||||||
|
pool : Option<Pool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Exporter {
|
||||||
|
fn config_construct() -> Result<Config> {
|
||||||
|
let mut cfg = Config::new();
|
||||||
|
cfg.host = Some(env::var("DB_HOST")?);
|
||||||
|
cfg.dbname = Some(env::var("DB_DBNAME")?);
|
||||||
|
cfg.user = Some(env::var("DB_USER")?);
|
||||||
|
cfg.password = Some(env::var("DB_PASSWORD")?);
|
||||||
|
Ok(cfg)
|
||||||
|
}
|
||||||
|
fn pool_construct() -> Option<Pool> {
|
||||||
|
return match Self::config_construct() {
|
||||||
|
Ok(config) => {
|
||||||
|
if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) {
|
||||||
|
info!("Connected to PostgreSQL");
|
||||||
|
return Some(pool);
|
||||||
|
}
|
||||||
|
None
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Bad DB credentials or it's unreachable");
|
||||||
|
None
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
||||||
|
pub fn init() -> Self {
|
||||||
|
Self {
|
||||||
|
pool : Self::pool_construct()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
||||||
|
if let Some(pool) = &self.pool {
|
||||||
|
return Some(pool.get().await.ok()?);
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||||
|
// client.
|
||||||
|
let query = "INSERT INTO metrics (body) VALUES ($1);";
|
||||||
|
let _ = client.execute(query, &[&query]).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
struct RestMethod;
|
struct RestMethod;
|
||||||
|
|
||||||
|
|
@ -15,69 +75,113 @@ impl RestMethod {
|
||||||
"patch" => Method::PATCH,
|
"patch" => Method::PATCH,
|
||||||
"put" => Method::PUT,
|
"put" => Method::PUT,
|
||||||
"delete" => Method::DELETE,
|
"delete" => Method::DELETE,
|
||||||
|
"head" => Method::HEAD,
|
||||||
|
"trace" => Method::TRACE,
|
||||||
|
"options" => Method::OPTIONS,
|
||||||
|
"connect" => Method::CONNECT,
|
||||||
"get" | _ => Method::GET
|
"get" | _ => Method::GET
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
struct ApiPoll<'a> {
|
struct ApiPoll<'a> {
|
||||||
config : &'a mut ApiConfig,
|
config : &'a mut ApiConfigV2,
|
||||||
client : Client,
|
client : Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> ApiPoll<'a> {
|
impl<'a> ApiPoll<'a> {
|
||||||
pub async fn new(poll_cfg : &'a mut ApiConfig) -> Self {
|
pub async fn new(poll_cfg : &'a mut ApiConfigV2) -> Self {
|
||||||
Self {
|
Self {
|
||||||
config : poll_cfg,
|
config : poll_cfg,
|
||||||
client : Client::new(),
|
client : Client::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// can be weak and with bug test needed
|
// can be weak and with bug test needed
|
||||||
pub async fn change_config(&mut self, conf: ApiConfig) {
|
pub async fn change_config(&mut self, conf: ApiConfigV2) {
|
||||||
*self.config = conf;
|
*self.config = conf;
|
||||||
}
|
}
|
||||||
pub async fn is_default(&self) -> bool {
|
pub async fn is_default(&self) -> bool {
|
||||||
self.config.endpoints.len() == 0
|
self.config.template.len() == 0
|
||||||
}
|
}
|
||||||
pub async fn process_polling(&self) -> Result<Vec<String>> {
|
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
|
||||||
let mut buffer: Vec<String> = vec![];
|
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
|
||||||
|
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||||
|
let client = Arc::new(self.client.clone());
|
||||||
|
let template = Arc::new(self.config.template.clone());
|
||||||
|
|
||||||
// TODO: rewrite nextly to async
|
// TODO: rewrite nextly to async
|
||||||
for point in &self.config.endpoints {
|
for point in template.iter() {
|
||||||
// let a = self.client.get(&point.url).send().await.unwrap();
|
let point = Arc::new(point.clone());
|
||||||
// a.text().await.unwrap();
|
// let buffer = buffer.clone();
|
||||||
match self.client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
|
let client = client.clone();
|
||||||
Ok(resp) => {
|
let exporter = exporter.clone();
|
||||||
if !resp.status().is_success() {
|
let endpoint_processer = tokio::spawn(async move {
|
||||||
error!("ErrorCode in Response from API. Check configuration");
|
let point = point.clone();
|
||||||
continue;
|
match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
|
||||||
}
|
Ok(resp) => {
|
||||||
if let Ok(text) = resp.text().await {
|
if !resp.status().is_success() {
|
||||||
info!("{}: {} - Successfull grubbing info", &point.method.to_uppercase(), &point.url);
|
error!("ErrorCode in Response from API. Check configuration");
|
||||||
buffer.push(text);
|
return Err(Error::msg("Error during sending request"));
|
||||||
} else {
|
}
|
||||||
error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url);
|
if let Ok(text) = resp.text().await {
|
||||||
}
|
//
|
||||||
},
|
let metrics = ProcessedEndpoint::from_target_response(&text, &point)?;
|
||||||
Err(er) => {
|
//
|
||||||
error!("{}: {} - Query crushed due to {}", &point.method.to_uppercase(), &point.url, er);
|
if let Some(conn) = exporter.get_connection_from_pool().await {
|
||||||
},
|
if let Err(er) = Exporter::export_data(conn, &metrics).await {
|
||||||
}
|
error!("Cannot export data to DB during to: {}", er);
|
||||||
|
return Err(Error::msg("Error during exporting data to DB"));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if !exporter.is_no_connection() {
|
||||||
|
return Err(Error::msg("Error during getting connection from pool"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// let mut buffer = buffer.lock().await;
|
||||||
|
// buffer.push(text);
|
||||||
|
} else {
|
||||||
|
error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
join_handles.push(endpoint_processer);
|
||||||
}
|
}
|
||||||
match &buffer.len() {
|
|
||||||
0 => Err(Error::msg("Error due to API grubbing. Check config" )),
|
for i in join_handles {
|
||||||
_ => Ok(buffer),
|
let _ = i.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// let buffer = buffer.lock().await;
|
||||||
|
// match &buffer.len() {
|
||||||
|
// 0 => Err(Error::msg("Error due to API grubbing. Check config" )),
|
||||||
|
// _ => {
|
||||||
|
// Ok(())
|
||||||
|
// },
|
||||||
|
// }
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn get_delay(&self) -> u32 {
|
pub async fn get_delay(&self) -> u32 {
|
||||||
self.config.delay
|
self.config.timeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// for api info pulling
|
// for api info pulling
|
||||||
pub async fn init_api_grub_mechanism(config: ApiConfig, rx: &mut Receiver<ApiConfig>) -> Result<()> {
|
pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiConfigV2>) -> Result<()> {
|
||||||
info!("Initializing API-info grubbing mechanism...");
|
info!("Initializing API-info grubbing mechanism...");
|
||||||
|
info!("Loading vars from .env file if exists...");
|
||||||
|
let _ = dotenv().ok();
|
||||||
|
|
||||||
let mut config = config;
|
let mut config = config;
|
||||||
let mut poller = ApiPoll::new(&mut config).await;
|
let mut poller = ApiPoll::new(&mut config).await;
|
||||||
|
let client = Exporter::init();
|
||||||
|
let shared_pool = Arc::new(client);
|
||||||
loop {
|
loop {
|
||||||
|
let shared_pool = shared_pool.clone();
|
||||||
if poller.is_default().await {
|
if poller.is_default().await {
|
||||||
sleep(Duration::from_secs(5)).await;
|
sleep(Duration::from_secs(5)).await;
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -87,7 +191,7 @@ pub async fn init_api_grub_mechanism(config: ApiConfig, rx: &mut Receiver<ApiCon
|
||||||
info!("Config changed");
|
info!("Config changed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("Data from API: {:?}", poller.process_polling().await);
|
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
|
||||||
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -98,7 +202,6 @@ pub async fn init_api_grub_mechanism(config: ApiConfig, rx: &mut Receiver<ApiCon
|
||||||
mod net_unittests {
|
mod net_unittests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use tokio::test;
|
use tokio::test;
|
||||||
use integr_structs::api::{ApiConfig, ApiEndpoint};
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
async fn check_str_to_rest_method() {
|
async fn check_str_to_rest_method() {
|
||||||
|
|
@ -111,16 +214,16 @@ mod net_unittests {
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
async fn check_api_poll_change_config() {
|
async fn check_api_poll_change_config() {
|
||||||
let mut conf1 = ApiConfig::default();
|
let mut conf1 = ApiConfigV2::default();
|
||||||
let conf2 = ApiConfig { endpoints : vec![], delay : 10, };
|
let conf2 = ApiConfigV2::pattern();
|
||||||
let mut poll = ApiPoll::new(&mut conf1).await;
|
let mut poll = ApiPoll::new(&mut conf1).await;
|
||||||
poll.change_config(conf2).await;
|
poll.change_config(conf2).await;
|
||||||
assert_eq!(poll.config.delay, 10)
|
assert_eq!(poll.config.timeout, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
async fn check_api_poll_is_default() {
|
async fn check_api_poll_is_default() {
|
||||||
let mut conf1 = ApiConfig::default();
|
let mut conf1 = ApiConfigV2::default();
|
||||||
let poll = ApiPoll::new(&mut conf1).await;
|
let poll = ApiPoll::new(&mut conf1).await;
|
||||||
assert!(poll.is_default().await)
|
assert!(poll.is_default().await)
|
||||||
}
|
}
|
||||||
|
|
@ -130,20 +233,14 @@ mod net_unittests {
|
||||||
use log::{set_max_level, LevelFilter};
|
use log::{set_max_level, LevelFilter};
|
||||||
|
|
||||||
set_max_level(LevelFilter::Off);
|
set_max_level(LevelFilter::Off);
|
||||||
let mut conf1 = ApiConfig {
|
let mut conf1 = ApiConfigV2::pattern();
|
||||||
endpoints : vec![
|
let conf2 = ApiConfigV2::default();
|
||||||
ApiEndpoint {
|
let exporter = Arc::new(Exporter::init());
|
||||||
url : String::from("https://dummy-json.mock.beeceptor.com/countries"),
|
|
||||||
method: String::from("get"),
|
|
||||||
}],
|
|
||||||
delay : 10,
|
|
||||||
};
|
|
||||||
let conf2 = ApiConfig::default();
|
|
||||||
|
|
||||||
let mut poll = ApiPoll::new(&mut conf1).await;
|
let mut poll = ApiPoll::new(&mut conf1).await;
|
||||||
assert!(poll.process_polling().await.is_ok());
|
assert!(poll.process_polling(exporter.clone()).await.is_ok());
|
||||||
|
|
||||||
poll.change_config(conf2).await;
|
poll.change_config(conf2).await;
|
||||||
assert!(poll.process_polling().await.is_err());
|
assert!(poll.process_polling(exporter.clone()).await.is_err());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -2,17 +2,46 @@
|
||||||
// using Unix-Socket Client
|
// using Unix-Socket Client
|
||||||
|
|
||||||
use anyhow::{Error, Result};
|
use anyhow::{Error, Result};
|
||||||
use integr_structs::api::ApiConfig;
|
use integr_structs::api::ApiConfigV2;
|
||||||
use tokio::time::{sleep, Instant};
|
use tokio::time::{sleep, Duration};
|
||||||
use tokio::net::UnixStream;
|
use tokio::net::UnixStream;
|
||||||
use std::env;
|
use std::env;
|
||||||
|
use tokio::sync::mpsc::Receiver;
|
||||||
|
use log::{info, error};
|
||||||
|
use serde_json::to_string_pretty;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
enum UnixSocketConsumer {
|
enum UnixSocketConsumer {
|
||||||
ApiGrubber,
|
ApiGrubber,
|
||||||
Preproc,
|
Preproc,
|
||||||
}
|
}
|
||||||
// to create us-client
|
// to create us-client
|
||||||
struct UnixSocketClient;
|
struct UnixSocketClient;
|
||||||
|
// to catch new configs from integr submod
|
||||||
|
type ConfigGateway = Receiver<ApiConfigV2>;
|
||||||
|
|
||||||
|
pub async fn init_delivery_mech(rx: ConfigGateway) -> Result<()> {
|
||||||
|
let mut rx = rx;
|
||||||
|
loop {
|
||||||
|
while rx.is_empty() {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
info!("New config was pulled. Redirecting it ...");
|
||||||
|
let config = rx.recv().await.unwrap();
|
||||||
|
match UnixSocketConsumer::ApiGrubber.get_stream_object().await {
|
||||||
|
Ok(socket) => {
|
||||||
|
socket.try_write(to_string_pretty(&config)?.as_bytes())?;
|
||||||
|
info!("New api config was pulled");
|
||||||
|
},
|
||||||
|
Err(er) => {
|
||||||
|
error!("Cannot coonect to ApiGrubber Unix-Socket due to {er}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sleep(Duration::from_secs(3)).await;
|
||||||
|
}
|
||||||
|
// let api_sock = UnixSocketConsumer::ApiGrubber.get_stream_object().await?;
|
||||||
|
// let preproc_sock = UnixSocketConsumer::Preproc.get_stream_object().await?;
|
||||||
|
}
|
||||||
|
|
||||||
impl UnixSocketConsumer {
|
impl UnixSocketConsumer {
|
||||||
pub async fn get_stream_object(&self) -> Result<UnixStream> {
|
pub async fn get_stream_object(&self) -> Result<UnixStream> {
|
||||||
|
|
@ -20,16 +49,17 @@ impl UnixSocketConsumer {
|
||||||
UnixSocketConsumer::ApiGrubber => env::var("API_GRUBBER_SOCKET")?,
|
UnixSocketConsumer::ApiGrubber => env::var("API_GRUBBER_SOCKET")?,
|
||||||
UnixSocketConsumer::Preproc => env::var("PREPROC_SOCKET")?,
|
UnixSocketConsumer::Preproc => env::var("PREPROC_SOCKET")?,
|
||||||
};
|
};
|
||||||
UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg("Cannot create Unix-Socket client")))
|
UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg(format!("Cannot create {:?} Unix-Socket client", self))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn check_unix_socket_file(path: &str) -> bool {
|
// async fn check_unix_socket_file(path: &str) -> bool {
|
||||||
std::path::Path::new(path).exists()
|
// std::path::Path::new(path).exists()
|
||||||
}
|
// }
|
||||||
|
|
||||||
#[cfg(tests)]
|
#[cfg(test)]
|
||||||
mod delivery_unittests {
|
mod delivery_unittests {
|
||||||
|
#[allow(unused_imports)]
|
||||||
use super::*;
|
use super::*;
|
||||||
use tokio::test;
|
use tokio::test;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
use integr_structs::api::ApiConfigV2;
|
||||||
|
use anyhow::Result;
|
||||||
|
use tokio::time::{sleep, Duration};
|
||||||
|
// mock
|
||||||
|
|
||||||
|
type Worker = Arc<Sender<ApiConfigV2>>;
|
||||||
|
|
||||||
|
#[allow(dead_code, unused_variables)]
|
||||||
|
pub async fn init_integration_mech(tx: Worker) -> Result<()> {
|
||||||
|
loop {
|
||||||
|
sleep(Duration::from_secs(5)).await;
|
||||||
|
let conf = ApiConfigV2::default();
|
||||||
|
tx.send(conf).await?;
|
||||||
|
}
|
||||||
|
// Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -2,11 +2,15 @@ mod delivery;
|
||||||
mod integration;
|
mod integration;
|
||||||
mod logger;
|
mod logger;
|
||||||
|
|
||||||
|
use integr_structs::api::ApiConfigV2;
|
||||||
use logger::setup_logger;
|
use logger::setup_logger;
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use log::info;
|
use log::{info, error};
|
||||||
|
use integration::init_integration_mech;
|
||||||
|
use delivery::init_delivery_mech;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
// Arch :
|
// Arch :
|
||||||
// 1) 2 Unix-Socket client (for api grub and preproc) :: i think its a continious process for events when services are unavailable
|
// 1) 2 Unix-Socket client (for api grub and preproc) :: i think its a continious process for events when services are unavailable
|
||||||
|
|
@ -16,10 +20,27 @@ use log::info;
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
let _ = setup_logger().await?;
|
let _ = setup_logger().await?;
|
||||||
dotenv().ok();
|
|
||||||
info!("Pulling env vars from .env file if exists ...");
|
info!("Pulling env vars from .env file if exists ...");
|
||||||
|
dotenv().ok();
|
||||||
|
|
||||||
//
|
let (tx, rx) = mpsc::channel::<ApiConfigV2>(1);
|
||||||
|
let tx = Arc::new(tx);
|
||||||
|
let integr_jh = tokio::spawn(
|
||||||
|
async move {
|
||||||
|
if let Err(er) = init_integration_mech(tx).await {
|
||||||
|
error!("Error in integration submodule during to: {}", er);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let deliv_future = tokio::spawn(async move {
|
||||||
|
if let Err(er) = init_delivery_mech(rx).await {
|
||||||
|
error!("Error in delivery submodule during to: {}", er);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// for deliv adding
|
||||||
|
// let action_vec: Vec<JoinHandle<()>> = vec![integr_jh, deliv_future];
|
||||||
|
for event in vec![integr_jh, deliv_future] {
|
||||||
|
let _ = event.await;
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,5 +4,6 @@ version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow = "1.0.95"
|
||||||
serde = { version = "1.0.217", features = ["derive"] }
|
serde = { version = "1.0.217", features = ["derive"] }
|
||||||
serde_json = "1.0.135"
|
serde_json = "1.0.135"
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,7 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
|
use serde_json::{ to_string_pretty, Value };
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
|
@ -22,3 +25,123 @@ impl Default for ApiConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// v2
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct ApiConfigV2 {
|
||||||
|
pub id : u64,
|
||||||
|
#[serde(default)]
|
||||||
|
pub template : Vec<Template>,
|
||||||
|
pub ip_address : String,
|
||||||
|
pub login : Option<String>,
|
||||||
|
pub pass : Option<String>,
|
||||||
|
pub api_key : Option<String>,
|
||||||
|
pub period : u32, // if "0" -> inf
|
||||||
|
pub timeout : u32, // if "0" -> no-delay
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ApiConfigV2 {
|
||||||
|
fn default() -> Self {
|
||||||
|
ApiConfigV2 {
|
||||||
|
id : 0,
|
||||||
|
template : Vec::new(),
|
||||||
|
ip_address : String::from("no_ip"),
|
||||||
|
login : None,
|
||||||
|
pass : None,
|
||||||
|
api_key : None,
|
||||||
|
period : 0,
|
||||||
|
timeout : 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl ApiConfigV2 {
|
||||||
|
pub fn template() -> Self {
|
||||||
|
ApiConfigV2 {
|
||||||
|
id : 1111,
|
||||||
|
template : Vec::new(),
|
||||||
|
ip_address : String::from("ip"),
|
||||||
|
login : None,
|
||||||
|
pass : None,
|
||||||
|
api_key : None,
|
||||||
|
period : 1111,
|
||||||
|
timeout : 1111,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn pattern() -> Self {
|
||||||
|
ApiConfigV2 {
|
||||||
|
id : 1111,
|
||||||
|
template : vec![
|
||||||
|
Template {
|
||||||
|
id : String::from("no id"),
|
||||||
|
name : String::from("open api"),
|
||||||
|
url : String::from("https://dummy-json.mock.beeceptor.com/countries"),
|
||||||
|
method : String::from("GET"),
|
||||||
|
measure : Vec::new(),
|
||||||
|
}
|
||||||
|
],
|
||||||
|
ip_address : String::from("ip"),
|
||||||
|
login : None,
|
||||||
|
pass : None,
|
||||||
|
api_key : None,
|
||||||
|
period : 1,
|
||||||
|
timeout : 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct Template {
|
||||||
|
pub id : String,
|
||||||
|
pub name : String,
|
||||||
|
pub url : String,
|
||||||
|
pub method : String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub measure : Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Template {
|
||||||
|
fn default() -> Self {
|
||||||
|
Template {
|
||||||
|
id : String::from("no-id"),
|
||||||
|
name : String::from("no-name"),
|
||||||
|
url : String::from("no-url"),
|
||||||
|
method : String::from("no-method"),
|
||||||
|
measure : Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct ProcessedEndpoint {
|
||||||
|
id : String,
|
||||||
|
name : String,
|
||||||
|
url : String,
|
||||||
|
method : String,
|
||||||
|
#[serde(default)]
|
||||||
|
metrics : HashMap<String, Value>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProcessedEndpoint {
|
||||||
|
pub fn new(id: &str, name: &str, url: &str, method: &str, metrics: HashMap<String, Value>) -> Self {
|
||||||
|
ProcessedEndpoint {
|
||||||
|
id : id.to_owned(),
|
||||||
|
name : name.to_owned(),
|
||||||
|
url : url.to_owned(),
|
||||||
|
method : method.to_owned(),
|
||||||
|
metrics : metrics,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn from_target_response(response: &str, keys: &Template) -> Result<String> {
|
||||||
|
let mut hm: HashMap<String, Value> = HashMap::new();
|
||||||
|
let mut response: Value = serde_json::from_str(response)?;
|
||||||
|
|
||||||
|
let _ = keys.measure.iter()
|
||||||
|
.map(|key| (key, response[key].take()))
|
||||||
|
.for_each(|(key, value)| {
|
||||||
|
hm.insert(key.clone(), value);
|
||||||
|
});
|
||||||
|
let val = ProcessedEndpoint::new(&keys.id, &keys.name, &keys.url, &keys.method,hm);
|
||||||
|
Ok(to_string_pretty(&val)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,3 +4,11 @@ version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow = "1.0.95"
|
||||||
|
chrono = "0.4.39"
|
||||||
|
dotenv = "0.15.0"
|
||||||
|
env_logger = "0.11.6"
|
||||||
|
log = "0.4.25"
|
||||||
|
serde = { version = "1.0.217", features = ["derive"] }
|
||||||
|
serde_json = "1.0.137"
|
||||||
|
tokio = { version = "1.43.0", features = ["full"] }
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
// mod for prpeproc config pulling and updating
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod config_unittests {
|
||||||
|
use tokio::test;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
async fn create_unix_socket_server() { assert!(true) }
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
async fn verify_on_valid_config() { assert!(true) }
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
async fn verify_on_invalid_config() { assert!(true) }
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,50 @@
|
||||||
|
use chrono::Local;
|
||||||
|
use env_logger::Builder;
|
||||||
|
use log::LevelFilter;
|
||||||
|
use std::io::Write;
|
||||||
|
use anyhow::Result;
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
pub async fn setup_logger() -> Result<()> {
|
||||||
|
Builder::new()
|
||||||
|
.format(move |buf, record| {
|
||||||
|
writeln!(
|
||||||
|
buf,
|
||||||
|
"|{}| {} [{}] - {}",
|
||||||
|
"config-delivery",
|
||||||
|
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||||
|
record.level(),
|
||||||
|
record.args(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.filter(None, LevelFilter::Info)
|
||||||
|
.target(env_logger::Target::Stdout)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
info!("Logger configured");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod logger_unittests {
|
||||||
|
use tokio::test;
|
||||||
|
use super::*;
|
||||||
|
#[test]
|
||||||
|
async fn check_logger_builder() {
|
||||||
|
Builder::new()
|
||||||
|
.format(move |buf, record| {
|
||||||
|
writeln!(
|
||||||
|
buf,
|
||||||
|
"|{}| {} [{}] - {}",
|
||||||
|
"config-delivery",
|
||||||
|
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||||
|
record.level(),
|
||||||
|
record.args(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.filter(None, LevelFilter::Info)
|
||||||
|
.target(env_logger::Target::Stdout)
|
||||||
|
.init();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,3 +1,18 @@
|
||||||
fn main() {
|
mod config;
|
||||||
println!("Hello, world!");
|
mod transform;
|
||||||
|
mod logger;
|
||||||
|
|
||||||
|
use logger::setup_logger;
|
||||||
|
use dotenv::dotenv;
|
||||||
|
use anyhow::Result;
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
|
async fn main() -> Result<()>{
|
||||||
|
let _ = setup_logger().await?;
|
||||||
|
|
||||||
|
info!("Pulling env vars from .env file if exists ...");
|
||||||
|
dotenv().ok();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
// mod for preproccessing and transfering to the CM metrics data
|
||||||
|
|
@ -0,0 +1,31 @@
|
||||||
|
{
|
||||||
|
"id" : 1 ,
|
||||||
|
"template" :
|
||||||
|
[{
|
||||||
|
"id" :"mock_api_1",
|
||||||
|
"name" : "Mock / ",
|
||||||
|
"url" : "http://127.0.0.1:8081/",
|
||||||
|
"method" : "GET",
|
||||||
|
"measure" :
|
||||||
|
[
|
||||||
|
"operation", "response", "empty_field"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id" :"mock_api_2",
|
||||||
|
"name" : "Mock /ping ",
|
||||||
|
"url" : "http://127.0.0.1:8081/ping",
|
||||||
|
"method" : "GET",
|
||||||
|
"measure" :
|
||||||
|
[
|
||||||
|
"operation", "response", "empty_field"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"ip_address" : "127.0.0.1:8081",
|
||||||
|
"login" : "",
|
||||||
|
"pass" : "" ,
|
||||||
|
"api_key" : "908c709827bd40n98r7209837x98273",
|
||||||
|
"period" : 10,
|
||||||
|
"timeout" : 2
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue