import { WebSocketGateway, WebSocketServer, OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect, SubscribeMessage, } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; import { PrometheusService } from './prometheus.service'; import { Logger } from '@nestjs/common'; @WebSocketGateway({ cors: { origin: process.env.FRONTEND_URL, methods: ['GET', 'POST'], credentials: true }, namespace: '/api/metrics-ws' }) export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { @WebSocketServer() server: Server; private readonly logger = new Logger(MetricsGateway.name); private activeSockets: Map = new Map(); private metricSubscriptions = new Map void; clients: Set; }>(); constructor(private readonly prometheusService: PrometheusService) { } afterInit(server: Server) { this.logger.log('WebSocket Gateway initialized'); } handleConnection(client: Socket) { this.logger.log(`Client connected: ${client.id}`); this.activeSockets.set(client.id, client); } handleDisconnect(client: Socket) { this.logger.log(`Client disconnected: ${client.id}`); this.activeSockets.delete(client.id); // Очистка всех подписок этого клиента for (const [metric, subscription] of this.metricSubscriptions) { subscription.clients.delete(client.id); if (subscription.clients.size === 0) { subscription.stopUpdates(); this.metricSubscriptions.delete(metric); } } } @SubscribeMessage('unsubscribe-all') handleUnsubscribeAll(client: Socket) { for (const [metric, subscription] of this.metricSubscriptions) { subscription.clients.delete(client.id); if (subscription.clients.size === 0) { subscription.stopUpdates(); this.metricSubscriptions.delete(metric); } } } @SubscribeMessage('get-metrics') async handleGetMetrics(client: Socket, payload: any) { const { metric, start, end, step, isRangeQuery, requestId, filters = {} } = payload; if (!metric) { client.emit('metrics-error', { error: 'Metric name is required', requestId }); return; } if (isRangeQuery) { try { const data = await this.prometheusService.fetchMetricsRange(metric, start, end, step, filters); client.emit('metrics-data', { metric, data, requestId }); return; } catch (error) { client.emit('metrics-error', { error: error.message, requestId }); return; } } try { const stopUpdates = await this.sendPeriodicUpdates( metric, step || 5000, (data) => { client.emit('metrics-data', { metric, data, requestId }); }, filters ); const cleanup = () => { stopUpdates(); client.off('disconnect', cleanup); client.off('unsubscribe-metric', cleanup); }; client.on('disconnect', cleanup); client.on('unsubscribe-metric', cleanup); } catch (error) { client.emit('metrics-error', { error: error.message, requestId }); } } private getSubscriptionKey(metric: string, filters: Record): string { // Создаём уникальный ключ на основе метрики и фильтров const filterKeys = Object.keys(filters).sort(); const filterString = filterKeys.map(k => `${k}=${filters[k]}`).join('&'); return `${metric}${filterString ? `?${filterString}` : ''}`; } @SubscribeMessage('subscribe-metric') async handleSubscribeMetric( client: Socket, payload: { metric: string; interval?: number; filters?: Record; } ) { const { metric, interval = 5000, filters = {} } = payload; const subscriptionKey = this.getSubscriptionKey(metric, filters); if (!this.metricSubscriptions.has(subscriptionKey)) { const stopUpdates = await this.sendPeriodicUpdates( metric, interval, (data) => { // Отправляем только подписчикам этой конкретной метрики с фильтрами this.server.emit('metrics-data', { metric: subscriptionKey, data }); }, filters ); this.metricSubscriptions.set(subscriptionKey, { stopUpdates, clients: new Set([client.id]) }); } else { this.metricSubscriptions.get(subscriptionKey)?.clients.add(client.id); } const unsubscribe = () => { const subscription = this.metricSubscriptions.get(subscriptionKey); if (subscription) { subscription.clients.delete(client.id); if (subscription.clients.size === 0) { subscription.stopUpdates(); this.metricSubscriptions.delete(subscriptionKey); } } }; client.on('disconnect', unsubscribe); client.on('unsubscribe-metric', unsubscribe); } async sendPeriodicUpdates( metric: string, interval: number, callback: (data: any) => void, filters: Record = {} ) { const timer = setInterval(async () => { try { const data = await this.prometheusService.fetchMetricsWithFilters(metric, filters); callback(data); } catch (error) { this.logger.error(`Error in periodic update for ${metric}:`, error.message); } }, interval); return () => { clearInterval(timer); this.logger.log(`Stopped updates for ${metric}`); }; } }