alomost done
parent
c7bc0d6ff5
commit
847fd1595d
|
|
@ -6,7 +6,7 @@
|
||||||
"pass" : "",
|
"pass" : "",
|
||||||
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
||||||
"period" : "",
|
"period" : "",
|
||||||
"timeout" : "3",
|
"timeout" : "10",
|
||||||
"metrics" : [
|
"metrics" : [
|
||||||
{
|
{
|
||||||
"name": "conferences",
|
"name": "conferences",
|
||||||
|
|
|
||||||
|
|
@ -2,16 +2,17 @@
|
||||||
use anyhow::{Error, Result};
|
use anyhow::{Error, Result};
|
||||||
use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
|
use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
|
use serde_json::Value;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use reqwest::{Client, Method};
|
use reqwest::{Client, Method, RequestBuilder};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
// use tokio::sync::Mutex;
|
// use tokio::sync::Mutex;
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
use crate::json::JsonParser;
|
use crate::json::JsonParser;
|
||||||
use crate::export::{self, Exporter};
|
use crate::export::{self, Exporter};
|
||||||
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials};
|
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics};
|
||||||
|
|
||||||
// type BufferType = Arc<Mutex<Vec<String>>>;
|
// type BufferType = Arc<Mutex<Vec<String>>>;
|
||||||
|
|
||||||
|
|
@ -85,19 +86,127 @@ impl<'a> ApiPoll<'a> {
|
||||||
// pub async fn get_delay(&self) -> u32 {
|
// pub async fn get_delay(&self) -> u32 {
|
||||||
// self.config.timeout
|
// self.config.timeout
|
||||||
// }
|
// }
|
||||||
|
pub async fn process_metrics(
|
||||||
|
service_id: Arc<String>,
|
||||||
|
metrics: Arc<Metrics>,
|
||||||
|
creds: Credentials,
|
||||||
|
// exporter: Arc<Exporter>
|
||||||
|
) -> Result<()> {
|
||||||
|
// processing metrics
|
||||||
|
let mut req = Client::new()
|
||||||
|
.get(&metrics.url);
|
||||||
|
let login = creds.endpoint.login.clone();
|
||||||
|
let password = creds.endpoint.password.clone();
|
||||||
|
let api_key = creds.endpoint.api_key.clone();
|
||||||
|
if !login.is_empty() && !password.is_empty() {
|
||||||
|
dbg!("kjgbkasgksjd");
|
||||||
|
req = req.basic_auth(login, Some(password));
|
||||||
|
}
|
||||||
|
if !api_key.is_empty() {
|
||||||
|
req = req.bearer_auth(&api_key);
|
||||||
|
// req = req.header("authorization", "bearer ");
|
||||||
|
|
||||||
|
req = req.header("accept", "application/json");
|
||||||
|
req = req.header("x-api-key", &api_key);
|
||||||
|
|
||||||
|
// req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"])
|
||||||
|
}
|
||||||
|
dbg!(&req);
|
||||||
|
// let (client, res) = req.build_split();
|
||||||
|
// let res = res.unwrap();
|
||||||
|
// res.url_mut().is_special()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// dbg!(client);
|
||||||
|
// dbg!(res);
|
||||||
|
// todo!();
|
||||||
|
|
||||||
|
match req.send().await {
|
||||||
|
Ok(resp) => {
|
||||||
|
dbg!(&resp);
|
||||||
|
if let Ok(response) = resp.text().await {
|
||||||
|
match serde_json::from_str::<Value>(&response) {
|
||||||
|
Err(er) => {error!("Bad JSON in response. Error: {}", er);},
|
||||||
|
Ok(_) => {
|
||||||
|
let endpoint_name = &metrics.name;
|
||||||
|
let preproc = JsonParser::parse(&metrics.measure, &response);
|
||||||
|
let metrics: String = serde_json::from_value(preproc.clone())
|
||||||
|
.unwrap_or({
|
||||||
|
error!("Cannot parse grabbed metrics data to String");
|
||||||
|
String::from(r#""value" : null"#)
|
||||||
|
});
|
||||||
|
dbg!(&metrics);
|
||||||
|
match Exporter::export_metrics(&metrics).await {
|
||||||
|
Ok(_) => {
|
||||||
|
info!("Successfully imported metrics data to Prometheus");
|
||||||
|
},
|
||||||
|
Err(er) => {
|
||||||
|
error!("Failed to export data to Prometheus due to {}", er);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("Bad response from {}. No data", &metrics.url);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(er) => {
|
||||||
|
error!("Cannot API data from {} due to : {}", &metrics.url, er);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
pub async fn process_endpoint(
|
pub async fn process_endpoint(
|
||||||
client : Arc<Client>,
|
// client : Arc<Client>,
|
||||||
config : Arc<ConfigEndpoint>,
|
config : Arc<ConfigEndpoint>,
|
||||||
creds : Credentials,
|
creds : Credentials,
|
||||||
exporter : Arc<Exporter>
|
// exporter : Arc<Exporter>
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
//
|
||||||
|
let period = config.get_period().unwrap_or(0);
|
||||||
|
let timeout = config.get_timeout().unwrap_or(5);
|
||||||
|
let metrics = Arc::new(config.metrics.clone());
|
||||||
|
let service_id = Arc::new(config.id.clone());
|
||||||
|
loop {
|
||||||
|
// let exporter = exporter.clone();
|
||||||
|
let creds = creds.clone();
|
||||||
|
let metrics = metrics.clone();
|
||||||
|
let service_id = service_id.clone();
|
||||||
|
let mut jh = Vec::<JoinHandle::<Result<()>>>::new();
|
||||||
|
|
||||||
|
for idx in 0..metrics.len() {
|
||||||
|
// let exporter = exporter.clone();
|
||||||
|
let creds = creds.clone();
|
||||||
|
let metrics = metrics.clone();
|
||||||
|
let service_id = service_id.clone();
|
||||||
|
let event = tokio::spawn(async move {
|
||||||
|
Self::process_metrics(
|
||||||
|
service_id.clone(),
|
||||||
|
metrics[idx].clone().into(),
|
||||||
|
creds.clone(),
|
||||||
|
// exporter.clone()
|
||||||
|
).await
|
||||||
|
});
|
||||||
|
jh.push(event);
|
||||||
|
}
|
||||||
|
info!("Initializing another {} subjob(s) for `{}` service",
|
||||||
|
jh.len(),
|
||||||
|
&service_id
|
||||||
|
);
|
||||||
|
|
||||||
|
for i in jh {
|
||||||
|
let _ = i.await;
|
||||||
|
}
|
||||||
|
// processing
|
||||||
|
sleep(Duration::from_secs(timeout)).await
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
|
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
|
||||||
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
|
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
|
||||||
// let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
// let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||||
let client = Arc::new(self.client.clone());
|
// let client = Arc::new(self.client.clone());
|
||||||
let config = Arc::new(self.config.clone());
|
let config = Arc::new(self.config.clone());
|
||||||
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
|
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
|
||||||
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||||
|
|
@ -106,18 +215,19 @@ impl<'a> ApiPoll<'a> {
|
||||||
// let for_creds = endpoints[idx].clone();
|
// let for_creds = endpoints[idx].clone();
|
||||||
let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
|
let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
|
||||||
let endpoint = endpoints[idx].clone();
|
let endpoint = endpoints[idx].clone();
|
||||||
let client = client.clone();
|
// let client = client.clone();
|
||||||
let exporter = exporter.clone();
|
let exporter = exporter.clone();
|
||||||
let join_handler = tokio::spawn(async move {
|
let join_handler = tokio::spawn(async move {
|
||||||
Self::process_endpoint(
|
Self::process_endpoint(
|
||||||
client,
|
// client,
|
||||||
endpoint,
|
endpoint,
|
||||||
creds,
|
creds,
|
||||||
exporter.clone()
|
// exporter.clone()
|
||||||
).await
|
).await
|
||||||
});
|
});
|
||||||
join_handles.push(join_handler);
|
join_handles.push(join_handler);
|
||||||
}
|
}
|
||||||
|
info!("Initializing {} task(s) for current config", join_handles.len());
|
||||||
for i in join_handles {
|
for i in join_handles {
|
||||||
let _ = i.await;
|
let _ = i.await;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -152,14 +152,14 @@ pub mod v3 {
|
||||||
pub use super::*;
|
pub use super::*;
|
||||||
|
|
||||||
// in config
|
// in config
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
pub struct Metric {
|
pub struct Metric {
|
||||||
pub id : String,
|
pub id : String,
|
||||||
#[serde(rename = "type")]
|
#[serde(rename = "type")]
|
||||||
pub json_type : String,
|
pub json_type : String,
|
||||||
pub addr : String,
|
pub addr : String,
|
||||||
}
|
}
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
pub struct Metrics {
|
pub struct Metrics {
|
||||||
pub name : String,
|
pub name : String,
|
||||||
pub url : String,
|
pub url : String,
|
||||||
|
|
@ -167,17 +167,17 @@ pub mod v3 {
|
||||||
pub measure : Vec<Metric>
|
pub measure : Vec<Metric>
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
pub struct ConfigEndpoint {
|
pub struct ConfigEndpoint {
|
||||||
id : String,
|
pub id : String,
|
||||||
login : String,
|
pub login : String,
|
||||||
#[serde(rename = "pass")]
|
#[serde(rename = "pass")]
|
||||||
password : String,
|
pub password : String,
|
||||||
api_key : String,
|
pub api_key : String,
|
||||||
period : String,
|
period : String,
|
||||||
timeout : String,
|
timeout : String,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
metrics : Vec<Metrics>,
|
pub metrics : Vec<Metrics>,
|
||||||
}
|
}
|
||||||
impl ConfigEndpoint {
|
impl ConfigEndpoint {
|
||||||
pub fn from_config(config: Arc<Config>) -> Vec<Arc<Self>> {
|
pub fn from_config(config: Arc<Config>) -> Vec<Arc<Self>> {
|
||||||
|
|
@ -192,12 +192,12 @@ pub mod v3 {
|
||||||
pub fn get_period(&self) -> Option<u32> {
|
pub fn get_period(&self) -> Option<u32> {
|
||||||
self.period.parse().ok()
|
self.period.parse().ok()
|
||||||
}
|
}
|
||||||
pub fn get_timeout(&self) -> Option<u32> {
|
pub fn get_timeout(&self) -> Option<u64> {
|
||||||
self.timeout.parse().ok()
|
self.timeout.parse().ok()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub config : Vec<ConfigEndpoint>,
|
pub config : Vec<ConfigEndpoint>,
|
||||||
}
|
}
|
||||||
|
|
@ -216,18 +216,24 @@ pub mod v3 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Credentials {
|
pub struct Credentials {
|
||||||
endpoint : Arc<ConfigEndpoint>,
|
pub endpoint : Arc<ConfigEndpoint>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Credentials {
|
impl Credentials {
|
||||||
pub fn from_config_endpoint(endpoint: Arc<ConfigEndpoint>) -> Credentials {
|
pub fn from_config_endpoint(endpoint: Arc<ConfigEndpoint>) -> Credentials {
|
||||||
Self { endpoint }
|
Self { endpoint }
|
||||||
}
|
}
|
||||||
|
// pub fn clone(self) -> Self {
|
||||||
|
// Self {
|
||||||
|
// endpoint : self.endpoint.clone()
|
||||||
|
// }
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
// to prometheus and nmns
|
// to prometheus and nmns
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub struct MetricOutput {
|
pub struct MetricOutput {
|
||||||
id : String,
|
id : String,
|
||||||
#[serde(rename = "type")]
|
#[serde(rename = "type")]
|
||||||
|
|
@ -245,4 +251,11 @@ pub mod v3 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct PrometheusMetrics {
|
||||||
|
service_name: String,
|
||||||
|
endpoint_name: String,
|
||||||
|
metrics: Value,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue