diff --git a/.env b/.env index 71e5cb2..83bf5f9 100644 --- a/.env +++ b/.env @@ -35,4 +35,4 @@ #CLICKHOUSE_DB=zvks # Для ai api -#ANALYSIS_API_URL=http://192.168.2.39:5134/models/api/metrics/rest \ No newline at end of file +#AI_SERVICE_URL=http://192.168.2.39:5134 \ No newline at end of file diff --git a/src/clickhouse/clickhouse.controller.ts b/src/clickhouse/clickhouse.controller.ts index 6a13f41..7cc34b0 100644 --- a/src/clickhouse/clickhouse.controller.ts +++ b/src/clickhouse/clickhouse.controller.ts @@ -1,11 +1,18 @@ -import { Controller, Get } from '@nestjs/common'; +import { Controller, Get, Post } from '@nestjs/common'; import { ClickHouseService } from './clickhouse.service'; import { ApiOperation, ApiResponse, ApiTags } from '@nestjs/swagger'; +import { HttpService } from '@nestjs/axios'; +import { firstValueFrom } from 'rxjs'; +import { ConfigService } from '@nestjs/config'; @ApiTags('Clickhouse') @Controller('clickhouse') export class ClickHouseController { - constructor(private readonly clickhouseService: ClickHouseService) { } + constructor( + private readonly clickhouseService: ClickHouseService, + private readonly httpService: HttpService, + private readonly configService: ConfigService, + ) { } @Get() @ApiOperation({ summary: 'Get metrics from ClickHouse' }) @@ -32,4 +39,24 @@ export class ClickHouseController { async getClckhouse() { return this.clickhouseService.getClckhouse(); } + + @Post('send-to-ai') + @ApiOperation({ summary: 'Send metrics to AI service' }) + @ApiResponse({ + status: 200, + description: 'AI service response', + }) + async sendToAI() { + const metrics = await this.clickhouseService.getClckhouse(); + const aiServiceUrl = this.configService.get('AI_SERVICE_URL/api/metrics/rest'); + + try { + const response = await firstValueFrom( + this.httpService.post(aiServiceUrl, metrics) + ); + return response.data; + } catch (error) { + throw new Error(`Failed to send data to AI: ${error.message}`); + } + } } \ No newline at end of file diff --git a/src/clickhouse/clickhouse.module.ts b/src/clickhouse/clickhouse.module.ts index 64e5ac4..2dc3d30 100644 --- a/src/clickhouse/clickhouse.module.ts +++ b/src/clickhouse/clickhouse.module.ts @@ -1,9 +1,11 @@ import { Module, Global } from '@nestjs/common'; import { createClient, ClickHouseClient } from '@clickhouse/client'; import { ClickHouseService } from './clickhouse.service'; +import { HttpModule } from '@nestjs/axios'; @Global() @Module({ + imports: [HttpModule], providers: [ { provide: 'CLICKHOUSE_CLIENT', diff --git a/src/main.ts b/src/main.ts index e3064aa..a97b0c4 100644 --- a/src/main.ts +++ b/src/main.ts @@ -42,7 +42,7 @@ async function bootstrap() { }); // Настройка CORS - app.enableCors({ + app.enableCors({//ПОСТАВИТЬ ПРОКСИ, ЧТОБЫ КОРС НЕ РУГАЛСЯ, ИЗМЕНЕНИЕ ПОЛИТИКИ СЕТЕВЫХ ПАКЕТОВ. ПИШУ IP СВОЙ, А ПОРТ ПРОКСИ. REVERSE PROXY. origin: [process.env.FRONTEND_URL, "http://dev.msf.enode"], credentials: true, methods: 'GET,HEAD,PUT,PATCH,POST,DELETE,OPTIONS', diff --git a/src/menu/menu.controller.ts b/src/menu/menu.controller.ts index 499ee08..ace33d2 100644 --- a/src/menu/menu.controller.ts +++ b/src/menu/menu.controller.ts @@ -5,27 +5,34 @@ import { MenuItem } from './menu.interface'; @Controller('menu') export class MenuController { constructor(private readonly menuService: MenuService) { } - + + @Get('full') + async getFullMenu(@Headers('if-modified-since') ifModifiedSince?: string) { + console.log('GET /menu/full requested'); + try { + const result = await this.menuService.getFullMenuWithCache(ifModifiedSince); + + if (!result.fresh && ifModifiedSince) { + throw new HttpException('Not Modified', HttpStatus.NOT_MODIFIED); + } + + return result.menu; + } catch (error) { + if (error.status === HttpStatus.NOT_MODIFIED) { + throw error; + } + throw new HttpException( + error.message || 'Failed to load menu', + HttpStatus.INTERNAL_SERVER_ERROR + ); + } + } +/* @Get('full') - async getFullMenu(@Headers('if-modified-since') ifModifiedSince?: string) { - try { - const result = await this.menuService.getFullMenuWithCache(ifModifiedSince); - - if (!result.fresh && ifModifiedSince) { - throw new HttpException('Not Modified', HttpStatus.NOT_MODIFIED); - } - - return result.menu; - } catch (error) { - if (error.status === HttpStatus.NOT_MODIFIED) { - throw error; - } - throw new HttpException( - error.message || 'Failed to load menu', - HttpStatus.INTERNAL_SERVER_ERROR - ); - } - } + async getFullMenu() { + console.log('Simplified endpoint called'); + return { test: 'OK' }; // Простейший ответ + } */ @Get('check-updates') async checkUpdates(@Headers('if-modified-since') ifModifiedSince: string) { diff --git a/src/prometheus/metrics.gateway.ts b/src/prometheus/metrics.gateway.ts index f545f27..9e76e23 100644 --- a/src/prometheus/metrics.gateway.ts +++ b/src/prometheus/metrics.gateway.ts @@ -26,6 +26,7 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat stopUpdates: () => void; clients: Set; }>(); + private lastSentData = new Map(); // Кэш последних отправленных данных constructor(private readonly prometheusService: PrometheusService) { } @@ -48,6 +49,7 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat if (subscription.clients.size === 0) { subscription.stopUpdates(); this.metricSubscriptions.delete(metric); + this.lastSentData.delete(metric); } } } @@ -59,6 +61,7 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat if (subscription.clients.size === 0) { subscription.stopUpdates(); this.metricSubscriptions.delete(metric); + this.lastSentData.delete(metric); } } } @@ -90,17 +93,28 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat } try { + const subscriptionKey = this.getSubscriptionKey(metric, filters); + // Отправляем текущие данные сразу при запросе + const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); + client.emit('metrics-data', { metric, data: initialData, requestId }); + this.lastSentData.set(subscriptionKey, initialData); + const stopUpdates = await this.sendPeriodicUpdates( metric, step || 5000, (data) => { - client.emit('metrics-data', { metric, data, requestId }); + const lastData = this.lastSentData.get(subscriptionKey); + if (!this.isDataEqual(lastData, data)) { + client.emit('metrics-data', { metric, data, requestId }); + this.lastSentData.set(subscriptionKey, data); + } }, filters ); const cleanup = () => { stopUpdates(); + this.lastSentData.delete(subscriptionKey); client.off('disconnect', cleanup); client.off('unsubscribe-metric', cleanup); }; @@ -120,7 +134,21 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat const filterKeys = Object.keys(filters).sort(); const filterString = filterKeys.map(k => `${k}=${encodeURIComponent(filters[k])}`).join('&'); return `${metric}${filterString ? `?${filterString}` : ''}`; - } + } + + // Сравниваем данные, чтобы избежать лишних отправок + private isDataEqual(oldData: any[], newData: any[]): boolean { + if (!oldData || !newData || oldData.length !== newData.length) return false; + + return oldData.every((oldItem, index) => { + const newItem = newData[index]; + return ( + oldItem.value === newItem.value && + oldItem.status === newItem.status && + oldItem.timestamp === newItem.timestamp + ); + }); + } @SubscribeMessage('subscribe-metric') async handleSubscribeMetric( @@ -131,18 +159,35 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat filters?: Record; } ) { - const { metric, interval = 5000, filters = {} } = payload; + const { metric, interval = 60000, filters = {} } = payload; // По умолчанию 60 секунд const subscriptionKey = this.getSubscriptionKey(metric, filters); if (!this.metricSubscriptions.has(subscriptionKey)) { + // Отправляем текущие данные сразу при подписке + try { + const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); + client.emit('metrics-data', { + metric: subscriptionKey, + data: initialData + }); + this.lastSentData.set(subscriptionKey, initialData); + } catch (error) { + this.logger.error(`Error fetching initial data for ${metric}:`, error.message); + } + const stopUpdates = await this.sendPeriodicUpdates( metric, interval, (data) => { - this.server.emit('metrics-data', { - metric: subscriptionKey, - data - }); + // Отправляем только если данные изменились + const lastData = this.lastSentData.get(subscriptionKey); + if (!this.isDataEqual(lastData, data)) { + this.server.emit('metrics-data', { + metric: subscriptionKey, + data + }); + this.lastSentData.set(subscriptionKey, data); + } }, filters ); @@ -153,6 +198,14 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat }); } else { this.metricSubscriptions.get(subscriptionKey)?.clients.add(client.id); + // Отправляем кэшированные данные новому клиенту + const cachedData = this.lastSentData.get(subscriptionKey); + if (cachedData) { + client.emit('metrics-data', { + metric: subscriptionKey, + data: cachedData + }); + } } const unsubscribe = () => { @@ -162,6 +215,7 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat if (subscription.clients.size === 0) { subscription.stopUpdates(); this.metricSubscriptions.delete(subscriptionKey); + this.lastSentData.delete(subscriptionKey); } } }; @@ -176,6 +230,11 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat callback: (data: any) => void, filters: Record = {} ) { + // Добавляем небольшую случайную задержку, чтобы избежать пиковой нагрузки + const initialDelay = Math.floor(Math.random() * 5000); + + await new Promise(resolve => setTimeout(resolve, initialDelay)); + const timer = setInterval(async () => { try { const data = await this.prometheusService.fetchMetricsWithFilters(metric, filters); @@ -187,7 +246,8 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat return () => { clearInterval(timer); + this.lastSentData.delete(this.getSubscriptionKey(metric, filters)); this.logger.log(`Stopped updates for ${metric}`); }; } -} +} \ No newline at end of file diff --git a/src/prometheus/prometheus.service.ts b/src/prometheus/prometheus.service.ts index 136d766..396e388 100644 --- a/src/prometheus/prometheus.service.ts +++ b/src/prometheus/prometheus.service.ts @@ -8,6 +8,8 @@ import { MenuItem } from '../menu/menu.interface'; @Injectable() export class PrometheusService { private readonly prometheusUrl: string; + private metricCache = new Map(); + private metadataCache = new Map(); constructor( private readonly httpService: HttpService, @@ -18,6 +20,13 @@ export class PrometheusService { } async fetchMetricType(metric: string): Promise { + const cacheKey = `metadata-type-${metric}`; + const cacheEntry = this.metadataCache.get(cacheKey); + + if (cacheEntry && Date.now() - cacheEntry.timestamp < 30000) { + return cacheEntry.type; + } + try { const response = await lastValueFrom( this.httpService.get(`${this.prometheusUrl}/metadata`, { @@ -25,14 +34,29 @@ export class PrometheusService { }) ); const metadata = response.data.data[metric]; - return metadata?.length ? metadata[0].type : null; + const result = metadata?.length ? metadata[0].type : null; + + this.metadataCache.set(cacheKey, { + type: result, + description: cacheEntry?.description, + timestamp: Date.now() + }); + + return result; } catch (error) { console.error(`Ошибка при получении типа метрики ${metric}:`, error); - return null; + return cacheEntry?.type || null; } } async fetchMetricDescription(metric: string): Promise { + const cacheKey = `metadata-description-${metric}`; + const cacheEntry = this.metadataCache.get(cacheKey); + + if (cacheEntry && Date.now() - cacheEntry.timestamp < 30000) { + return cacheEntry.description; + } + try { const response = await lastValueFrom( this.httpService.get(`${this.prometheusUrl}/metadata`, { @@ -40,14 +64,29 @@ export class PrometheusService { }) ); const metadata = response.data.data[metric]; - return metadata?.length ? metadata[0].help : undefined; + const result = metadata?.length ? metadata[0].help : undefined; + + this.metadataCache.set(cacheKey, { + type: cacheEntry?.type ?? null, + description: result, + timestamp: Date.now() + }); + + return result; } catch (error) { console.error(`Ошибка при получении описания метрики ${metric}:`, error); - return undefined; + return cacheEntry?.description; } } async fetchMetrics(metric: string): Promise { + const cacheKey = `${metric}:{}`; + const cacheEntry = this.metricCache.get(cacheKey); + + if (cacheEntry && Date.now() - cacheEntry.timestamp < 5000) { + return cacheEntry.data; + } + try { const response = await lastValueFrom( this.httpService.get(`${this.prometheusUrl}/query`, { @@ -58,7 +97,7 @@ export class PrometheusService { const metricType = await this.fetchMetricType(metric); const metricDescription = await this.fetchMetricDescription(metric); - return response.data.data.result.map((entry): PrometheusMetric => ({ + const result = response.data.data.result.map((entry): PrometheusMetric => ({ __name__: entry.metric.__name__ || metric, device: entry.metric.device, instance: entry.metric.instance, @@ -71,13 +110,24 @@ export class PrometheusService { description: metricDescription, ...entry.metric })); + + this.metricCache.set(cacheKey, { data: result, timestamp: Date.now() }); + return result; } catch (error) { console.error(`Error fetching metrics for ${metric}:`, error); + if (cacheEntry) return cacheEntry.data; throw error; } } async fetchMetricsWithFilters(metric: string, filters: Record): Promise { + const cacheKey = `${metric}:${JSON.stringify(filters)}`; + const cacheEntry = this.metricCache.get(cacheKey); + + if (cacheEntry && Date.now() - cacheEntry.timestamp < 5000) { + return cacheEntry.data; + } + try { const query = this.buildFilteredQuery(metric, filters); const response = await lastValueFrom( @@ -89,7 +139,7 @@ export class PrometheusService { const metricType = await this.fetchMetricType(metric); const metricDescription = await this.fetchMetricDescription(metric); - return response.data.data.result.map((entry): PrometheusMetric => ({ + const result = response.data.data.result.map((entry): PrometheusMetric => ({ __name__: entry.metric.__name__ || metric, device: entry.metric.device, instance: entry.metric.instance, @@ -102,8 +152,12 @@ export class PrometheusService { description: metricDescription, ...entry.metric })); + + this.metricCache.set(cacheKey, { data: result, timestamp: Date.now() }); + return result; } catch (error) { console.error(`Error fetching metrics with filters for ${metric}:`, error); + if (cacheEntry) return cacheEntry.data; throw error; } } @@ -112,7 +166,6 @@ export class PrometheusService { const filterParts = Object.entries(filters) .filter(([_, value]) => value !== undefined && value !== null && value !== "") .map(([key, value]) => { - // Убираем автоматическое добавление "module$" для source_id return `${key}="${value}"`; }); @@ -122,10 +175,15 @@ export class PrometheusService { } async fetchMetricsRange(metric: string, start: number, end: number, step: number, filters: Record = {}): Promise { + // Рассчитываем оптимальный шаг, если не указан + const duration = end - start; + const optimalStep = Math.max(Math.floor(duration / 1000), 15); // Минимум 15 секунд + const query = this.buildFilteredQuery(metric, { ...filters, instance: '192.168.2.34:9050' }); + try { const response = await lastValueFrom( this.httpService.get(`${this.prometheusUrl}/query_range`, { @@ -133,7 +191,7 @@ export class PrometheusService { query, start, end, - step: step.toString() + step: optimalStep.toString() }, }) ); @@ -217,21 +275,40 @@ export class PrometheusService { } async fetchAllMetrics(): Promise { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/label/__name__/values`) - ); - return response.data.data; + try { + const response = await lastValueFrom( + this.httpService.get(`${this.prometheusUrl}/label/__name__/values`) + ); + return response.data.data; + } catch (error) { + console.error('Error fetching all metrics:', error); + return []; + } } async fetchAllMetricsWithValues(): Promise { const metricNames = await this.fetchAllMetrics(); - const zvksMetrics = metricNames.filter(metric => metric.startsWith('zvks')); + const zvksMetrics = metricNames.filter(metric => + metric.startsWith('zvks') || + metric.includes('server_li') || + metric.includes('application_li') + ); const promises = zvksMetrics.map(async (metric) => { - const data = await this.fetchMetrics(metric); - return { metric, data }; + try { + const data = await this.fetchMetrics(metric); + return { metric, data }; + } catch (error) { + console.error(`Error fetching data for metric ${metric}:`, error); + return { metric, data: [] }; + } }); return Promise.all(promises); } -} + + clearCache(): void { + this.metricCache.clear(); + this.metadataCache.clear(); + } +} \ No newline at end of file