+ exporter communication example

pull/6/head
prplV 2025-01-27 14:00:55 +03:00
parent 5e22a5e6b2
commit 21b1f30354
4 changed files with 27 additions and 9 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
/target /target
Cargo.lock Cargo.lock
*.sock *.sock
.env

View File

@ -8,7 +8,8 @@
"method" : "GET", "method" : "GET",
"measure" : "measure" :
[ [
"operation", "response" "response", "test_metric_1", "test_metric_2",
"test_metric_3", "test_metric_4", "test_metric_5"
] ]
}, },
{ {
@ -18,7 +19,9 @@
"method" : "GET", "method" : "GET",
"measure" : "measure" :
[ [
"operation", "response", "empty_field" "response", "test_metric_1", "test_metric_2",
"test_metric_3", "test_metric_4", "test_metric_5",
"empty_field"
] ]
} }
], ],

View File

@ -13,6 +13,6 @@ log = "0.4.25"
anyhow = "1.0.95" anyhow = "1.0.95"
chrono = "0.4.39" chrono = "0.4.39"
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] } reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
deadpool-postgres = "0.14.1" deadpool-postgres = { version = "0.14.1", features = ["serde"] }
tokio-postgres = "0.7.12" tokio-postgres = "0.7.12"
dotenv = "0.15.0" dotenv = "0.15.0"

View File

@ -2,18 +2,17 @@
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use integr_structs::api::{ApiConfigV2, ProcessedEndpoint}; use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
use log::{error, info}; use log::{error, info};
use serde_json::Value;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use reqwest::{Client, Method}; use reqwest::{Client, Method};
use std::sync::Arc; use std::sync::Arc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
// use tokio::sync::Mutex; // use tokio::sync::Mutex;
use tokio_postgres::types::ToSql;
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
use tokio_postgres::NoTls; use tokio_postgres::NoTls;
use dotenv::dotenv; use dotenv::dotenv;
use std::env; use std::env;
use serde::{Deserialize, Serialize};
// type BufferType = Arc<Mutex<Vec<String>>>; // type BufferType = Arc<Mutex<Vec<String>>>;
@ -59,8 +58,8 @@ impl Exporter {
} }
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
// client. // client.
let query = "INSERT INTO metrics (body) VALUES ($1);"; let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.execute(query, &[&query]).await?; let _ = client.query(&query, &[&metrics]).await?;
Ok(()) Ok(())
} }
@ -125,9 +124,24 @@ impl<'a> ApiPoll<'a> {
if let Ok(text) = resp.text().await { if let Ok(text) = resp.text().await {
// //
let metrics = ProcessedEndpoint::from_target_response(&text, &point)?; let metrics = ProcessedEndpoint::from_target_response(&text, &point)?;
// dbg!(&metrics);
println!("{}", &metrics);
// //
if let Some(conn) = exporter.get_connection_from_pool().await { if let Some(conn) = exporter.get_connection_from_pool().await {
if let Err(er) = Exporter::export_data(conn, &metrics).await {
// TEST: to exporter
let res = client.request(
RestMethod::from_str("post").await,
"http://192.168.2.34:9101/update")
.json(&metrics)
.send().await;
if let Err(er) = res {
error!("Cannot send data to exporter due to: {}", er);
} else {
println!("{:?}", res.unwrap().text().await);
}
if let Err(er) = Exporter::export_data(conn, &metrics).await {
error!("Cannot export data to DB during to: {}", er); error!("Cannot export data to DB during to: {}", er);
return Err(Error::msg("Error during exporting data to DB")); return Err(Error::msg("Error during exporting data to DB"));
} }