fixed a bug with multiple web socket connection
test-org/trust-module-backend/pipeline/pr-rc This commit looks good Details

pull/16/head
DmitriyA 2025-04-21 02:56:19 -04:00
parent 2c06038b0e
commit 918656a5b4
6 changed files with 67 additions and 61 deletions

7
.gitignore vendored
View File

@ -52,5 +52,12 @@ pids
*.seed *.seed
*.pid.lock *.pid.lock
# Игнорировать .env файлы
.env
.env.local
.env.development
.env.production
.env.test
# Diagnostic reports (https://nodejs.org/api/report.html) # Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json

View File

@ -43,7 +43,8 @@
"@types/passport-jwt": "^4.0.1", "@types/passport-jwt": "^4.0.1",
"@types/cookie-parser": "^1.4.8", "@types/cookie-parser": "^1.4.8",
"@nestjs/jwt": "^11.0.0", "@nestjs/jwt": "^11.0.0",
"@nestjs/passport": "^11.0.5" "@nestjs/passport": "^11.0.5",
"@nestjs/swagger": "11.1.4"
}, },
"devDependencies": { "devDependencies": {
"@eslint/eslintrc": "^3.2.0", "@eslint/eslintrc": "^3.2.0",

View File

@ -50,7 +50,7 @@ export class AuthController {
const { access_token } = await this.authService.login(user); const { access_token } = await this.authService.login(user);
res.cookie('access_token', access_token, { res.cookie('accecdss_token', access_token, {
httpOnly: true, httpOnly: true,
secure: process.env.COOKIE_SECURE === 'true', secure: process.env.COOKIE_SECURE === 'true',
sameSite: (process.env.COOKIE_SAME_SITE as 'strict' | 'lax' | 'none') || 'strict', sameSite: (process.env.COOKIE_SAME_SITE as 'strict' | 'lax' | 'none') || 'strict',

View File

@ -1,32 +1,33 @@
import { WebSocketGateway, WebSocketServer, OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect, SubscribeMessage, } from '@nestjs/websockets'; import { WebSocketGateway, WebSocketServer, OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect, SubscribeMessage } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io'; import { Server, Socket } from 'socket.io';
import { PrometheusService } from './prometheus.service'; import { PrometheusService } from './prometheus.service';
import { Logger } from '@nestjs/common'; import { Logger } from '@nestjs/common';
@WebSocketGateway({ @WebSocketGateway({
cors: { cors: {
origin: process.env.FRONTEND_URL || 'http://192.168.2.39:5173', // Тот же origin что и в основном CORS origin: process.env.FRONTEND_URL,
methods: ['GET', 'POST'], methods: ['GET', 'POST'],
credentials: true credentials: true
}, },
namespace: '/api/metrics-ws' namespace: '/api/metrics-ws'
}) })
export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() server: Server; @WebSocketServer() server: Server;
private readonly logger = new Logger(MetricsGateway.name); private readonly logger = new Logger(MetricsGateway.name);
private activeSockets: Map<string, Socket> = new Map(); private activeSockets: Map<string, Socket> = new Map();
private metricSubscriptions = new Map<string, {
stopUpdates: () => void;
clients: Set<string>;
}>();
constructor(private readonly prometheusService: PrometheusService) { } constructor(private readonly prometheusService: PrometheusService) { }
afterInit(server: Server) { afterInit(server: Server) {
this.logger.log('WebSocket Gateway initialized'); this.logger.log('WebSocket Gateway initialized');
this.logger.log('WebSocket server initialized successfully');
} }
handleConnection(client: Socket) { handleConnection(client: Socket) {
this.logger.log(`Client connected: ${client.id}`); this.logger.log(`Client connected: ${client.id}`);
this.logger.log(`New client connected: ${client.id} from ${client.handshake.address}`);
this.activeSockets.set(client.id, client); this.activeSockets.set(client.id, client);
} }
@ -37,22 +38,23 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
@SubscribeMessage('get-metrics') @SubscribeMessage('get-metrics')
async handleGetMetrics(client: Socket, payload: any) { async handleGetMetrics(client: Socket, payload: any) {
const { metric, start, end, step, _t } = payload; const { metric, start, end, step, isRangeQuery } = payload;
this.logger.log(`Received metrics request: ${metric}, start: ${start}, end: ${end}, step: ${step}`);
try { try {
// Для запросов с диапазоном - просто возвращаем данные без подписки if (isRangeQuery) {
if (start && end) { // Обработка разового запроса
const data = await this.prometheusService.fetchMetricsRange(metric, start, end, step); const data = await this.prometheusService.fetchMetricsRange(metric, start, end, step);
client.emit('metrics-data', { metric, data }); client.emit(`metrics-range-${metric}`, data);
return; return;
} }
// Для запросов без диапазона (realtime) - запускаем подписку // Исправлено: передаем функцию, а не клиента
const stopUpdates = await this.sendPeriodicUpdates( const stopUpdates = await this.sendPeriodicUpdates(
metric, metric,
step || 5000, // Используем переданный шаг или дефолтный step || 5000,
client (data) => {
client.emit('metrics-data', { metric, data });
}
); );
client.on('disconnect', () => stopUpdates()); client.on('disconnect', () => stopUpdates());
@ -67,59 +69,52 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
} }
} }
@SubscribeMessage('get-metric-types')
async handleGetMetricTypes(client: Socket, payload: { metric: string }) {
try {
const type = await this.prometheusService.fetchMetricType(payload.metric);
const description = await this.prometheusService.fetchMetricDescription(payload.metric);
client.emit('metric-types', {
metric: payload.metric,
type,
description
});
} catch (error) {
this.logger.log(`Error fetching metric types: ${error.message}`);
client.emit('metrics-error', {
metric: payload.metric,
error: error.message
});
}
}
@SubscribeMessage('get-all-metrics')
async handleGetAllMetrics(client: Socket) {
try {
const metrics = await this.prometheusService.fetchAllMetrics();
client.emit('all-metrics', metrics);
} catch (error) {
this.logger.log(`Error fetching all metrics: ${error.message}`);
client.emit('metrics-error', {
error: error.message
});
}
}
@SubscribeMessage('subscribe-metric') @SubscribeMessage('subscribe-metric')
async handleSubscribeMetric(client: Socket, payload: { metric: string, interval?: number }) { async handleSubscribeMetric(client: Socket, payload: { metric: string, interval?: number }) {
const stopUpdates = await this.sendPeriodicUpdates( const { metric } = payload;
payload.metric,
payload.interval || 5000, // Добавляем значение по умолчанию
client // Передаем клиента
);
// Сохраняем функцию остановки для этого клиента if (!this.metricSubscriptions.has(metric)) {
client.on('disconnect', () => stopUpdates()); const stopUpdates = await this.sendPeriodicUpdates(
client.on('unsubscribe-metric', () => stopUpdates()); metric,
payload.interval || 5000,
(data) => {
this.server.emit('metrics-data', { metric, data });
}
);
this.metricSubscriptions.set(metric, {
stopUpdates,
clients: new Set([client.id])
});
} else {
// Исправлено: добавлена проверка на существование подписки
const subscription = this.metricSubscriptions.get(metric);
if (subscription) {
subscription.clients.add(client.id);
}
}
const unsubscribe = () => {
const subscription = this.metricSubscriptions.get(metric);
if (subscription) {
subscription.clients.delete(client.id);
if (subscription.clients.size === 0) {
subscription.stopUpdates();
this.metricSubscriptions.delete(metric);
}
}
};
client.on('disconnect', unsubscribe);
client.on('unsubscribe-metric', unsubscribe);
} }
// Метод для периодической отправки обновлений // Метод для периодической отправки обновлений
async sendPeriodicUpdates(metric: string, interval: number, client: Socket) { async sendPeriodicUpdates(metric: string, interval: number, callback: (data: any) => void) {
const timer = setInterval(async () => { const timer = setInterval(async () => {
try { try {
const data = await this.prometheusService.fetchMetrics(metric); const data = await this.prometheusService.fetchMetrics(metric);
client.emit('metrics-data', { metric, data }); callback(data); // Используем переданный callback
} catch (error) { } catch (error) {
this.logger.error(`Error in periodic update for ${metric}: ${error.message}`); this.logger.error(`Error in periodic update for ${metric}: ${error.message}`);
} }

View File

@ -5,4 +5,5 @@ export interface PrometheusMetric {
value: number; value: number;
type: string; // Тип метрики ("gauge", "counter", и т. д.) type: string; // Тип метрики ("gauge", "counter", и т. д.)
description?: string; // Описание метрики description?: string; // Описание метрики
status?: string; // Добавляем поле для статуса
} }

View File

@ -66,7 +66,8 @@ export class PrometheusService {
timestamp: entry.value[0] * 1000, timestamp: entry.value[0] * 1000,
value: parseFloat(entry.value[1]), value: parseFloat(entry.value[1]),
type: metricType || 'unknown', type: metricType || 'unknown',
description: metricDescription, // Добавляем описание description: metricDescription,
status: entry.metric.status || 'green', // Используем статус из Prometheus или 'green' по умолчанию
})); }));
} }
@ -92,7 +93,8 @@ export class PrometheusService {
timestamp: value[0] * 1000, timestamp: value[0] * 1000,
value: parseFloat(value[1]), value: parseFloat(value[1]),
type: metricType || 'unknown', type: metricType || 'unknown',
description: metricDescription, // Добавляем описание description: metricDescription,
status: entry.metric.status || 'green', // Используем статус из Prometheus или 'green' по умолчанию
})) }))
); );
} }