import { WebSocketGateway, WebSocketServer, OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect, SubscribeMessage } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; import { PrometheusService } from './prometheus.service'; import { Logger } from '@nestjs/common'; @WebSocketGateway({ cors: { origin: process.env.FRONTEND_URL, methods: ['GET', 'POST'], credentials: true }, namespace: '/api/metrics-ws' }) export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { @WebSocketServer() server: Server; private readonly logger = new Logger(MetricsGateway.name); private activeSockets: Map = new Map(); private metricSubscriptions = new Map void; clients: Set; }>(); constructor(private readonly prometheusService: PrometheusService) { } afterInit(server: Server) { this.logger.log('WebSocket Gateway initialized'); } handleConnection(client: Socket) { this.logger.log(`Client connected: ${client.id}`); this.activeSockets.set(client.id, client); } handleDisconnect(client: Socket) { this.logger.log(`Client disconnected: ${client.id}`); this.activeSockets.delete(client.id); } @SubscribeMessage('get-metrics') async handleGetMetrics(client: Socket, payload: any) { const { metric, start, end, step, isRangeQuery } = payload; try { if (isRangeQuery) { // Обработка разового запроса const data = await this.prometheusService.fetchMetricsRange(metric, start, end, step); client.emit(`metrics-range-${metric}`, data); return; } // Исправлено: передаем функцию, а не клиента const stopUpdates = await this.sendPeriodicUpdates( metric, step || 5000, (data) => { client.emit('metrics-data', { metric, data }); } ); client.on('disconnect', () => stopUpdates()); client.on('unsubscribe-metric', () => stopUpdates()); } catch (error) { this.logger.error(`Error fetching metrics: ${error.message}`); client.emit('metrics-error', { metric, error: error.message }); } } @SubscribeMessage('subscribe-metric') async handleSubscribeMetric(client: Socket, payload: { metric: string, interval?: number }) { const { metric } = payload; if (!this.metricSubscriptions.has(metric)) { const stopUpdates = await this.sendPeriodicUpdates( metric, payload.interval || 5000, (data) => { this.server.emit('metrics-data', { metric, data }); } ); this.metricSubscriptions.set(metric, { stopUpdates, clients: new Set([client.id]) }); } else { // Исправлено: добавлена проверка на существование подписки const subscription = this.metricSubscriptions.get(metric); if (subscription) { subscription.clients.add(client.id); } } const unsubscribe = () => { const subscription = this.metricSubscriptions.get(metric); if (subscription) { subscription.clients.delete(client.id); if (subscription.clients.size === 0) { subscription.stopUpdates(); this.metricSubscriptions.delete(metric); } } }; client.on('disconnect', unsubscribe); client.on('unsubscribe-metric', unsubscribe); } // Метод для периодической отправки обновлений async sendPeriodicUpdates(metric: string, interval: number, callback: (data: any) => void) { const timer = setInterval(async () => { try { const data = await this.prometheusService.fetchMetrics(metric); callback(data); // Используем переданный callback } catch (error) { this.logger.error(`Error in periodic update for ${metric}: ${error.message}`); } }, interval); return () => { clearInterval(timer); this.logger.log(`Stopped updates for ${metric}`); }; } }