base func

master
prplV 2025-06-24 10:11:46 -04:00
parent 6805c7ac94
commit ddea3b51bb
6 changed files with 212 additions and 74 deletions

View File

@ -9,9 +9,10 @@ axum = { version = "0.8.4", features = ["ws"] }
dotenv = "0.15.0"
futures = "0.3.31"
lazy_static = "1.5.0"
reqwest = { version = "0.12.20", features = ["rustls-tls"] }
reqwest = { version = "0.12.20", features = ["json", "rustls-tls"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
tokio = { version = "1.45.1", features = ["full"] }
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
utoipa-axum = "0.2.0"

View File

@ -22,11 +22,11 @@ ML-API is a high-performance Rust service designed to process system metrics, an
Create a `.env` file in your project root:
```env
# ML API target URL (e.g., Ollama would use: http://localhost:11434/api/generate)
ML_TARGET_URL="http://your-ml-backend/api"
# Log level (TRACE|DEBUG|INFO|WARN|ERROR|OFF)
ML_LOG_LEVEL="INFO"
ML_TARGET_URL="http://url.to/ml/api"
ML_MODEL_NAME="kis-test"
ML_REQUEST_TIMEOUT="10"
ML_API_LOG_LEVEL="INFO"
ML_API_PORT="5134"
```
### Installation
@ -64,10 +64,8 @@ Send your metrics as a JSON array in the following format:
{
"id": "10001",
"name": "cpu_utilization",
"type": "i64",
"addr": "enode.monitoring.api",
"value": null,
"description": "cpu_utilization",
"value": 1.23,
"status": 0,
"device": 18,
"source": "module$11"

View File

@ -1,5 +1,13 @@
use axum::response::IntoResponse;
use axum::http::StatusCode;
use axum::{
response::IntoResponse,
http::StatusCode,
extract::{ Json, State },
};
use crate::{models::ModelResponse, schemas::InputMetric};
use crate::models::{ApiSessionConfig, OutputModel};
use std::sync::Arc;
type Config = Arc<ApiSessionConfig>;
pub mod openapi {
use super::{IntoResponse, StatusCode};
@ -8,13 +16,97 @@ pub mod openapi {
}
pub mod rest {
use super::{IntoResponse, StatusCode};
use tracing::trace;
use super::{IntoResponse, StatusCode, InputMetric, Json, Config, State};
pub async fn model_rest_handler() -> impl IntoResponse { (StatusCode::NOT_IMPLEMENTED, "still in development ...") }
pub async fn model_rest_handler(
State(config) : State<Config>,
Json(req) : Json<Vec<InputMetric>>,
) -> impl IntoResponse {
trace!("GET on /api/metrics/rest");
return match super::send_message_to_model(config.clone(), req).await {
Ok(resp) => (StatusCode::OK, resp),
Err(er) => (StatusCode::INTERNAL_SERVER_ERROR, format!("cannot get model response: {er}")),
}
}
}
pub mod ws {
use super::{IntoResponse, StatusCode};
use axum::extract::{ws::{Message, WebSocket}, WebSocketUpgrade};
use tracing::trace;
use super::{IntoResponse, InputMetric, Config, State};
pub async fn model_ws_handler() -> impl IntoResponse { (StatusCode::NOT_IMPLEMENTED, "still in development ...") }
pub async fn model_ws_handler(
State(config): State<Config>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
trace!("working with new WebSocket connection ...");
ws.on_upgrade(|socket| model_ws_worker(config, socket))
}
async fn model_ws_worker(
config: Config,
mut ws: WebSocket,
) {
trace!("handling WebSocket connection ...");
let ws_reciever = tokio::spawn(async move {
while let Some(Ok(msg)) = ws.recv().await {
match msg.to_text() {
Err(er) => {
let _ = ws.send(Message::Text(format!("Cannot convert input message: {er}").into())).await;
},
Ok(msg) => {
match serde_json::from_str::<Vec<InputMetric>>(msg) {
Err(er) => {
let _ = ws.send(
Message::Text(
format!("Cannot convert input message: {er}")
.into()
)
).await;
},
Ok(req) => {
match super::send_message_to_model(config.clone(), req).await {
Ok(resp) => {
let _ = ws.send(Message::Text(resp.into()));
},
Err(er) => {
let _ = ws.send(Message::Text(format!("Cannot get model's response: {er}").into())).await;
},
}
},
}
},
}
}
});
let _ = ws_reciever.await;
}
}
async fn send_message_to_model(
config: Config,
req: Vec<InputMetric>
) -> anyhow::Result<String> {
let prompt = OutputModel::build(
&config.model_name,
format!("{} {}", serde_json::to_string(&req)?, &*crate::PROMPT)
);
let request = config
.client
// TODO: REMOVE CLONE()
.clone()
.post(&config.target_url)
.json(&prompt)
.timeout(tokio::time::Duration::from_secs(config.request_timeout as u64));
Ok(
request
.send()
.await?
.json::<ModelResponse>()
.await?
.response
)
}

View File

@ -4,7 +4,7 @@ mod models;
mod setup;
use std::sync::Arc;
use tracing::info;
use tracing::{debug, info, trace};
use dotenv::dotenv;
use endpoints::{
openapi::swagger,
@ -12,10 +12,15 @@ use endpoints::{
ws::model_ws_handler,
};
use setup::setup_self_config;
use schemas::ApiSessionConfig;
use models::ApiSessionConfig;
use axum::{
routing::get,
Router};
use lazy_static::lazy_static;
lazy_static! {
static ref PROMPT: String = String::from("\n проанализируй ВЕСЬ этот список метрик системы, обращая внимание на статусы КАЖДОЙ МЕТРИКИ:\n 0 - не определен \n 1 - норма \n 2 - предупреждение \n 3 - критично \n 4 - авария \n напиши общую подробную оценку системы с указанием вообще всех слабых мест и рекомендаций по устранению проблем (ВАЖНО!!! : только на русском языке без форматирования и лишнего текста, без комментариев, только оценка и рекомендации) \n ВАЖНО! НЕ ИСПОЛЬЗУЙ MD-форматирование выходного текста, анализируй метрики выше и по ним делай выводы и рекомендации");
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
@ -25,14 +30,17 @@ async fn main() -> anyhow::Result<()> {
let base_url = format!("0.0.0.0:{}", port);
// routing
trace!("configurating routing ...");
let api = Router::new()
.route("/rest", get(model_rest_handler))
.route("/ws", get(model_ws_handler));
.route("/ws", get(model_ws_handler))
.with_state(app_state.clone());
let router = Router::new()
.nest("/api", api)
.route("/swagger", get(swagger))
.with_state(app_state.clone());
.nest("/api/metrics", api)
.route("/swagger", get(swagger));
debug!("router for app : {:?}", router);
// running
info!("serving on {} ...", &base_url);

View File

@ -0,0 +1,81 @@
use reqwest::Client;
use tracing::{debug, instrument, warn, info};
use std::fmt;
use serde::{Deserialize, Serialize};
///
#[derive(Debug)]
pub struct ApiSessionConfig {
// integration config
pub target_url : String,
pub model_name : String,
pub request_timeout : usize,
pub client : Client,
}
impl fmt::Display for ApiSessionConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
let param_name_width = 25;
let param_name_val = (self.model_name.as_bytes().iter().count() + 4).max(self.target_url.as_bytes().iter().count() + 4);
let param_name_measure = 10;
// header
writeln!(f, "+{:-<param_name_width$}-+-{:-<param_name_val$}+{:-<param_name_measure$}-+",
"", "", "")?;
writeln!(f, "| {: <param_name_width$}| {: <param_name_val$}| {: <param_name_measure$}|",
"Param", "Value", "Measure")?;
writeln!(f, "+{:-<param_name_width$}-+-{:-<param_name_val$}+{:-<param_name_measure$}-+",
"", "", "")?;
// data
writeln!(f, "| {: <param_name_width$}| {: <param_name_val$}| {: <param_name_measure$}|",
"Target model url", self.target_url, "-")?;
writeln!(f, "| {: <param_name_width$}| {: <param_name_val$}| {: <param_name_measure$}|",
"Tagert model name", self.model_name, "-")?;
writeln!(f, "| {: <param_name_width$}| {: <param_name_val$}| {: <param_name_measure$}|",
"API response wait-time", self.request_timeout, "secs")?;
// footer
writeln!(f, "+{:-<param_name_width$}-+-{:-<param_name_val$}+{:-<param_name_measure$}-+",
"", "", "")?;
Ok(())
}
}
impl ApiSessionConfig {
#[instrument("app-config")]
pub fn from_env() -> anyhow::Result<Self> {
let config = Self {
target_url : std::env::var("ML_TARGET_URL")?,
model_name : std::env::var("ML_MODEL_NAME")?,
request_timeout : std::env::var("ML_REQUEST_TIMEOUT")?.parse().unwrap_or_else(|_| {
warn!("invalid port was given, setting up default one ...");
10
}),
client : Client::new(),
};
debug!("{:?}", config);
info!("app is configurated. config :\n{}", config);
Ok(config)
}
}
#[derive(Debug, Serialize)]
pub struct OutputModel {
model : String,
prompt : String,
stream : bool,
}
impl OutputModel {
pub fn build(model_name: &str, prompt: String) -> Self {
Self {
model : model_name.to_owned(),
prompt : prompt,
stream : false,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ModelResponse {
pub response: String,
}

View File

@ -1,55 +1,13 @@
use tracing::{debug, instrument, warn, info};
use std::fmt;
use serde::{Serialize, Deserialize};
///
#[derive(Debug)]
pub struct ApiSessionConfig {
// integration config
target_url : String,
model_name : String,
request_timeout : usize,
}
impl fmt::Display for ApiSessionConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
let param_name_width = 25;
let param_name_val = (self.model_name.as_bytes().iter().count() + 4).max(self.target_url.as_bytes().iter().count() + 4);
let param_name_measure = 10;
// header
writeln!(f, "+{:-<param_name_width$}-+-{:-<param_name_val$}+{:-<param_name_measure$}-+",
"", "", "")?;
writeln!(f, "| {: <param_name_width$}| {: <param_name_val$}| {: <param_name_measure$}|",
"Param", "Value", "Measure")?;
writeln!(f, "+{:-<param_name_width$}-+-{:-<param_name_val$}+{:-<param_name_measure$}-+",
"", "", "")?;
// data
writeln!(f, "| {: <param_name_width$}| {: <param_name_val$}| {: <param_name_measure$}|",
"Target model url", self.target_url, "-")?;
writeln!(f, "| {: <param_name_width$}| {: <param_name_val$}| {: <param_name_measure$}|",
"Tagert model name", self.model_name, "-")?;
writeln!(f, "| {: <param_name_width$}| {: <param_name_val$}| {: <param_name_measure$}|",
"API response wait-time", self.request_timeout, "secs")?;
// footer
writeln!(f, "+{:-<param_name_width$}-+-{:-<param_name_val$}+{:-<param_name_measure$}-+",
"", "", "")?;
Ok(())
}
}
impl ApiSessionConfig {
#[instrument("app-config")]
pub fn from_env() -> anyhow::Result<Self> {
let config = Self {
target_url : std::env::var("ML_TARGET_URL")?,
model_name : std::env::var("ML_MODEL_NAME")?,
request_timeout : std::env::var("ML_REQUEST_TIMEOUT")?.parse().unwrap_or_else(|_| {
warn!("invalid port was given, setting up default one ...");
10
}),
};
debug!("{:?}", config);
info!("app is configurated. config :\n{}", config);
Ok(config)
}
#[derive(Debug, Deserialize, Serialize)]
pub struct InputMetric {
id : String,
name : String,
description : String,
value : serde_json::Value,
status : usize,
device : usize,
source : String,
timestamp : u64,
}