Compare commits
No commits in common. "0f4f4bcf1505726c80afa60f2d500bf0edc4bb1e" and "a9650366eb200826a80478a4b1236d4faf4d645b" have entirely different histories.
0f4f4bcf15
...
a9650366eb
2
.env
2
.env
|
|
@ -1,7 +1,7 @@
|
||||||
# Прометеус
|
# Прометеус
|
||||||
PROMETHEUS_API=http://192.168.2.34:9090/api/v1
|
PROMETHEUS_API=http://192.168.2.34:9090/api/v1
|
||||||
|
|
||||||
FRONTEND_URL=http://localhost:5173
|
FRONTEND_URL=localhost:5173
|
||||||
|
|
||||||
# Постгресс
|
# Постгресс
|
||||||
DB_HOST=192.168.2.37
|
DB_HOST=192.168.2.37
|
||||||
|
|
|
||||||
|
|
@ -36,8 +36,7 @@
|
||||||
"reflect-metadata": "^0.2.2",
|
"reflect-metadata": "^0.2.2",
|
||||||
"rxjs": "^7.8.1",
|
"rxjs": "^7.8.1",
|
||||||
"socket.io": "^4.8.1",
|
"socket.io": "^4.8.1",
|
||||||
"typeorm": "^0.3.21",
|
"typeorm": "^0.3.21"
|
||||||
"ws": "^8.18.3"
|
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@eslint/eslintrc": "^3.2.0",
|
"@eslint/eslintrc": "^3.2.0",
|
||||||
|
|
@ -5957,27 +5956,6 @@
|
||||||
"node": ">= 0.6"
|
"node": ">= 0.6"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/engine.io/node_modules/ws": {
|
|
||||||
"version": "8.17.1",
|
|
||||||
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz",
|
|
||||||
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
|
|
||||||
"license": "MIT",
|
|
||||||
"engines": {
|
|
||||||
"node": ">=10.0.0"
|
|
||||||
},
|
|
||||||
"peerDependencies": {
|
|
||||||
"bufferutil": "^4.0.1",
|
|
||||||
"utf-8-validate": ">=5.0.2"
|
|
||||||
},
|
|
||||||
"peerDependenciesMeta": {
|
|
||||||
"bufferutil": {
|
|
||||||
"optional": true
|
|
||||||
},
|
|
||||||
"utf-8-validate": {
|
|
||||||
"optional": true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"node_modules/enhanced-resolve": {
|
"node_modules/enhanced-resolve": {
|
||||||
"version": "5.18.1",
|
"version": "5.18.1",
|
||||||
"resolved": "https://registry.npmjs.org/enhanced-resolve/-/enhanced-resolve-5.18.1.tgz",
|
"resolved": "https://registry.npmjs.org/enhanced-resolve/-/enhanced-resolve-5.18.1.tgz",
|
||||||
|
|
@ -11006,27 +10984,6 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/socket.io-adapter/node_modules/ws": {
|
|
||||||
"version": "8.17.1",
|
|
||||||
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz",
|
|
||||||
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
|
|
||||||
"license": "MIT",
|
|
||||||
"engines": {
|
|
||||||
"node": ">=10.0.0"
|
|
||||||
},
|
|
||||||
"peerDependencies": {
|
|
||||||
"bufferutil": "^4.0.1",
|
|
||||||
"utf-8-validate": ">=5.0.2"
|
|
||||||
},
|
|
||||||
"peerDependenciesMeta": {
|
|
||||||
"bufferutil": {
|
|
||||||
"optional": true
|
|
||||||
},
|
|
||||||
"utf-8-validate": {
|
|
||||||
"optional": true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"node_modules/socket.io-parser": {
|
"node_modules/socket.io-parser": {
|
||||||
"version": "4.2.4",
|
"version": "4.2.4",
|
||||||
"resolved": "https://registry.npmmirror.com/socket.io-parser/-/socket.io-parser-4.2.4.tgz",
|
"resolved": "https://registry.npmmirror.com/socket.io-parser/-/socket.io-parser-4.2.4.tgz",
|
||||||
|
|
@ -12961,9 +12918,9 @@
|
||||||
"license": "ISC"
|
"license": "ISC"
|
||||||
},
|
},
|
||||||
"node_modules/ws": {
|
"node_modules/ws": {
|
||||||
"version": "8.18.3",
|
"version": "8.17.1",
|
||||||
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.18.3.tgz",
|
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz",
|
||||||
"integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==",
|
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"engines": {
|
"engines": {
|
||||||
"node": ">=10.0.0"
|
"node": ">=10.0.0"
|
||||||
|
|
|
||||||
43
package.json
43
package.json
|
|
@ -20,35 +20,34 @@
|
||||||
"test:e2e": "jest --config ./test/jest-e2e.json"
|
"test:e2e": "jest --config ./test/jest-e2e.json"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@clickhouse/client": "^1.11.2",
|
|
||||||
"@clickhouse/client-web": "^1.11.2",
|
|
||||||
"@nestjs/axios": "^4.0.0",
|
"@nestjs/axios": "^4.0.0",
|
||||||
"@nestjs/common": "^11.0.1",
|
"@nestjs/common": "^11.0.1",
|
||||||
"@nestjs/config": "^4.0.0",
|
|
||||||
"@nestjs/core": "^11.0.1",
|
"@nestjs/core": "^11.0.1",
|
||||||
"@nestjs/jwt": "^11.0.0",
|
"@nestjs/config": "^4.0.0",
|
||||||
"@nestjs/passport": "^11.0.5",
|
|
||||||
"@nestjs/platform-express": "^11.0.1",
|
"@nestjs/platform-express": "^11.0.1",
|
||||||
"@nestjs/platform-socket.io": "11.0.12",
|
|
||||||
"@nestjs/swagger": "11.1.4",
|
|
||||||
"@nestjs/typeorm": "^11.0.0",
|
|
||||||
"@nestjs/websockets": "11.0.12",
|
|
||||||
"@types/bcrypt": "^5.0.2",
|
|
||||||
"@types/cookie-parser": "^1.4.8",
|
|
||||||
"@types/passport-jwt": "^4.0.1",
|
|
||||||
"axios": "^1.7.9",
|
"axios": "^1.7.9",
|
||||||
"bcrypt": "^5.1.1",
|
"reflect-metadata": "^0.2.2",
|
||||||
"cookie-parser": "^1.4.7",
|
|
||||||
"date-fns": "4.1.0",
|
|
||||||
"dotenv": "^16.3.1",
|
"dotenv": "^16.3.1",
|
||||||
|
"rxjs": "^7.8.1",
|
||||||
|
"@nestjs/typeorm": "^11.0.0",
|
||||||
|
"pg": "^8.14.1",
|
||||||
|
"typeorm": "^0.3.21",
|
||||||
|
"bcrypt": "^5.1.1",
|
||||||
|
"@types/bcrypt": "^5.0.2",
|
||||||
|
"socket.io": "^4.8.1",
|
||||||
|
"@nestjs/websockets": "11.0.12",
|
||||||
|
"@nestjs/platform-socket.io": "11.0.12",
|
||||||
"passport": "^0.7.0",
|
"passport": "^0.7.0",
|
||||||
"passport-jwt": "^4.0.1",
|
"passport-jwt": "^4.0.1",
|
||||||
"pg": "^8.14.1",
|
"cookie-parser": "^1.4.7",
|
||||||
"reflect-metadata": "^0.2.2",
|
"@types/passport-jwt": "^4.0.1",
|
||||||
"rxjs": "^7.8.1",
|
"@types/cookie-parser": "^1.4.8",
|
||||||
"socket.io": "^4.8.1",
|
"@nestjs/jwt": "^11.0.0",
|
||||||
"typeorm": "^0.3.21",
|
"@nestjs/passport": "^11.0.5",
|
||||||
"ws": "^8.18.3"
|
"@nestjs/swagger": "11.1.4",
|
||||||
|
"@clickhouse/client": "^1.11.2",
|
||||||
|
"date-fns": "4.1.0",
|
||||||
|
"@clickhouse/client-web": "^1.11.2"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@eslint/eslintrc": "^3.2.0",
|
"@eslint/eslintrc": "^3.2.0",
|
||||||
|
|
@ -94,4 +93,4 @@
|
||||||
"coverageDirectory": "../coverage",
|
"coverageDirectory": "../coverage",
|
||||||
"testEnvironment": "node"
|
"testEnvironment": "node"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1,391 +1,253 @@
|
||||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
import {
|
||||||
import { Server, WebSocket } from 'ws';
|
WebSocketGateway,
|
||||||
import { createServer } from 'http';
|
WebSocketServer,
|
||||||
|
OnGatewayInit,
|
||||||
|
OnGatewayConnection,
|
||||||
|
OnGatewayDisconnect,
|
||||||
|
SubscribeMessage,
|
||||||
|
} from '@nestjs/websockets';
|
||||||
|
import { Server, Socket } from 'socket.io';
|
||||||
import { PrometheusService } from './prometheus.service';
|
import { PrometheusService } from './prometheus.service';
|
||||||
import { ConfigService } from '@nestjs/config';
|
import { Logger } from '@nestjs/common';
|
||||||
|
|
||||||
type Filters = Record<string, string>;
|
@WebSocketGateway({
|
||||||
|
cors: {
|
||||||
@Injectable()
|
origin: process.env.FRONTEND_URL,
|
||||||
export class MetricsGateway implements OnModuleInit {
|
methods: ['GET', 'POST'],
|
||||||
|
credentials: true
|
||||||
|
},
|
||||||
|
namespace: '/api/metrics-ws'
|
||||||
|
})
|
||||||
|
export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
|
||||||
|
@WebSocketServer() server: Server;
|
||||||
private readonly logger = new Logger(MetricsGateway.name);
|
private readonly logger = new Logger(MetricsGateway.name);
|
||||||
private wss: Server;
|
private activeSockets: Map<string, Socket> = new Map();
|
||||||
|
private metricSubscriptions = new Map<string, {
|
||||||
|
stopUpdates: () => void;
|
||||||
|
clients: Set<string>;
|
||||||
|
}>();
|
||||||
|
private lastSentData = new Map<string, any>(); // Кэш последних отправленных данных
|
||||||
|
|
||||||
private activeSockets = new Map<string, WebSocket>();
|
constructor(private readonly prometheusService: PrometheusService) { }
|
||||||
private metricSubscriptions = new Map<
|
|
||||||
string,
|
|
||||||
{ stopUpdates: () => void; clients: Set<string> }
|
|
||||||
>();
|
|
||||||
|
|
||||||
private lastSentData = new Map<string, any>();
|
afterInit(server: Server) {
|
||||||
|
this.logger.log('WebSocket Gateway initialized');
|
||||||
constructor(
|
|
||||||
private readonly prometheusService: PrometheusService,
|
|
||||||
private readonly configService: ConfigService
|
|
||||||
) { }
|
|
||||||
|
|
||||||
onModuleInit() {
|
|
||||||
const httpServer = createServer();
|
|
||||||
this.wss = new Server({
|
|
||||||
server: httpServer,
|
|
||||||
path: '/metrics-ws',
|
|
||||||
});
|
|
||||||
|
|
||||||
this.wss.on('connection', (client, request) =>
|
|
||||||
this.handleConnection(client, request)
|
|
||||||
);
|
|
||||||
this.wss.on('error', (err) =>
|
|
||||||
this.logger.error('WebSocket server error:', err)
|
|
||||||
);
|
|
||||||
|
|
||||||
const wsPort = Number(this.configService.get('WS_PORT') || 3001);
|
|
||||||
httpServer.listen(wsPort, () => {
|
|
||||||
this.logger.log(
|
|
||||||
`WebSocket server running at ws://localhost:${wsPort}/api/metrics-ws`
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handleConnection(client: Socket) {
|
||||||
private handleConnection(client: WebSocket, request: any) {
|
this.logger.log(`Client connected: ${client.id}`);
|
||||||
let clientId =
|
this.activeSockets.set(client.id, client);
|
||||||
this.getQueryParams(request?.url).clientId ||
|
|
||||||
Math.random().toString(36).slice(2);
|
|
||||||
|
|
||||||
this.activeSockets.set(clientId, client);
|
|
||||||
this.logger.log(`Client connected: ${clientId}`);
|
|
||||||
|
|
||||||
client.on('message', (raw) => {
|
|
||||||
try {
|
|
||||||
const msg = JSON.parse(raw.toString());
|
|
||||||
this.handleMessage(clientId, client, msg);
|
|
||||||
} catch (err) {
|
|
||||||
this.sendError(client, 'Invalid JSON');
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
client.on('close', () => this.cleanupClient(clientId));
|
|
||||||
client.on('error', (err) => {
|
|
||||||
this.logger.error(`Client ${clientId} error:`, err);
|
|
||||||
this.cleanupClient(clientId);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private handleMessage(clientId: string, client: WebSocket, message: any) {
|
handleDisconnect(client: Socket) {
|
||||||
const { event, data } = message || {};
|
this.logger.log(`Client disconnected: ${client.id}`);
|
||||||
if (!event) return this.sendError(client, 'Event type is required');
|
this.activeSockets.delete(client.id);
|
||||||
|
|
||||||
switch (event) {
|
// Очистка всех подписок этого клиента
|
||||||
case 'unsubscribe-all':
|
for (const [metric, subscription] of this.metricSubscriptions) {
|
||||||
return this.unsubscribeAllForClient(clientId);
|
subscription.clients.delete(client.id);
|
||||||
|
if (subscription.clients.size === 0) {
|
||||||
case 'get-metrics':
|
subscription.stopUpdates();
|
||||||
return this.handleGetMetrics(client, data);
|
this.metricSubscriptions.delete(metric);
|
||||||
|
this.lastSentData.delete(metric);
|
||||||
case 'subscribe-metric':
|
|
||||||
return this.handleSubscribeMetric(clientId, client, data);
|
|
||||||
|
|
||||||
case 'unsubscribe-metric':
|
|
||||||
return this.handleUnsubscribeMetric(clientId, data);
|
|
||||||
|
|
||||||
default:
|
|
||||||
return this.sendError(client, `Unknown event type: ${event}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private cleanupClient(clientId: string) {
|
|
||||||
if (!this.activeSockets.has(clientId)) return;
|
|
||||||
this.logger.log(`Client disconnected: ${clientId}`);
|
|
||||||
this.activeSockets.delete(clientId);
|
|
||||||
|
|
||||||
setTimeout(() => {
|
|
||||||
if (!this.activeSockets.has(clientId)) {
|
|
||||||
}
|
|
||||||
}, 5000);
|
|
||||||
|
|
||||||
for (const [key, sub] of this.metricSubscriptions) {
|
|
||||||
sub.clients.delete(clientId);
|
|
||||||
if (sub.clients.size === 0) {
|
|
||||||
sub.stopUpdates();
|
|
||||||
this.metricSubscriptions.delete(key);
|
|
||||||
this.lastSentData.delete(key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private unsubscribeAllForClient(clientId: string) {
|
@SubscribeMessage('unsubscribe-all')
|
||||||
for (const [key, sub] of this.metricSubscriptions) {
|
handleUnsubscribeAll(client: Socket) {
|
||||||
if (sub.clients.has(clientId)) sub.clients.delete(clientId);
|
for (const [metric, subscription] of this.metricSubscriptions) {
|
||||||
if (sub.clients.size === 0) {
|
subscription.clients.delete(client.id);
|
||||||
sub.stopUpdates();
|
if (subscription.clients.size === 0) {
|
||||||
this.metricSubscriptions.delete(key);
|
subscription.stopUpdates();
|
||||||
this.lastSentData.delete(key);
|
this.metricSubscriptions.delete(metric);
|
||||||
|
this.lastSentData.delete(metric);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SubscribeMessage('get-metrics')
|
||||||
private async handleGetMetrics(client: WebSocket, payload: any) {
|
async handleGetMetrics(client: Socket, payload: any) {
|
||||||
const {
|
const { metric, start, end, step, isRangeQuery, requestId, filters = {} } = payload;
|
||||||
metric,
|
|
||||||
start,
|
|
||||||
end,
|
|
||||||
step,
|
|
||||||
isRangeQuery,
|
|
||||||
requestId,
|
|
||||||
filters = {},
|
|
||||||
} = payload || {};
|
|
||||||
|
|
||||||
if (!metric) {
|
if (!metric) {
|
||||||
return this.sendError(client, 'Metric name is required', requestId);
|
client.emit('metrics-error', {
|
||||||
|
error: 'Metric name is required',
|
||||||
|
requestId
|
||||||
|
});
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isRangeQuery) {
|
if (isRangeQuery) {
|
||||||
try {
|
try {
|
||||||
const rangeData = await this.prometheusService.fetchMetricsRange(
|
const data = await this.prometheusService.fetchMetricsRange(metric, start, end, step, filters);
|
||||||
metric,
|
client.emit('metrics-data', { metric, data, requestId });
|
||||||
start,
|
return;
|
||||||
end,
|
} catch (error) {
|
||||||
step,
|
client.emit('metrics-error', {
|
||||||
filters
|
error: error.message,
|
||||||
);
|
requestId
|
||||||
this.logger.debug('RangeQuery result', JSON.stringify(rangeData).slice(0, 200));
|
|
||||||
|
|
||||||
return this.sendMessage(client, {
|
|
||||||
event: 'metrics-data',
|
|
||||||
data: rangeData,
|
|
||||||
metric,
|
|
||||||
requestId,
|
|
||||||
});
|
});
|
||||||
} catch (err: any) {
|
return;
|
||||||
return this.sendError(client, err?.message || 'Range query error', requestId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const subscriptionKey = this.getSubscriptionKey(metric, filters);
|
const subscriptionKey = this.getSubscriptionKey(metric, filters);
|
||||||
|
// Отправляем текущие данные сразу при запросе
|
||||||
const initialData =
|
const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||||
await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
client.emit('metrics-data', { metric, data: initialData, requestId });
|
||||||
|
this.lastSentData.set(subscriptionKey, initialData);
|
||||||
this.sendMessage(client, {
|
|
||||||
event: 'metrics-data',
|
|
||||||
data: { metric, data: initialData, requestId },
|
|
||||||
});
|
|
||||||
|
|
||||||
let lastLocal = initialData;
|
|
||||||
|
|
||||||
const jitter = Math.floor(Math.random() * 5000);
|
|
||||||
setTimeout(() => {
|
|
||||||
const timer = setInterval(async () => {
|
|
||||||
try {
|
|
||||||
const fresh =
|
|
||||||
await this.prometheusService.fetchMetricsWithFilters(
|
|
||||||
metric,
|
|
||||||
filters
|
|
||||||
);
|
|
||||||
if (!this.isDataEqual(lastLocal, fresh)) {
|
|
||||||
this.sendMessage(client, {
|
|
||||||
event: 'metrics-data',
|
|
||||||
data: { metric, data: fresh, requestId },
|
|
||||||
});
|
|
||||||
lastLocal = fresh;
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
this.logger.error(
|
|
||||||
`Error in on-demand periodic update for ${subscriptionKey}:`,
|
|
||||||
(e as any)?.message
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}, step || 5000);
|
|
||||||
|
|
||||||
const closeOnce = () => clearInterval(timer);
|
|
||||||
client.once('close', closeOnce);
|
|
||||||
client.once('error', closeOnce);
|
|
||||||
}, jitter);
|
|
||||||
} catch (err: any) {
|
|
||||||
return this.sendError(client, err?.message || 'get-metrics error', requestId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async handleSubscribeMetric(clientId: string, client: WebSocket, payload: any) {
|
|
||||||
const { metric, interval = 60000, filters = {} } = payload || {};
|
|
||||||
if (!metric) {
|
|
||||||
this.sendError(client, 'Metric name is required');
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const key = this.getSubscriptionKey(metric, filters);
|
|
||||||
|
|
||||||
try {
|
|
||||||
const initial = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
|
||||||
|
|
||||||
if (!Array.isArray(initial)) {
|
|
||||||
throw new Error(`Expected array for metric ${metric}, got ${typeof initial}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.sendMessage(client, {
|
|
||||||
event: 'metrics-data',
|
|
||||||
data: {
|
|
||||||
metric: key,
|
|
||||||
data: initial,
|
|
||||||
type: 'initial'
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
this.lastSentData.set(key, initial);
|
|
||||||
|
|
||||||
const stopUpdates = await this.sendPeriodicUpdates(
|
const stopUpdates = await this.sendPeriodicUpdates(
|
||||||
metric,
|
metric,
|
||||||
interval,
|
step || 5000,
|
||||||
(freshData) => {
|
(data) => {
|
||||||
if (!Array.isArray(freshData)) {
|
const lastData = this.lastSentData.get(subscriptionKey);
|
||||||
this.logger.error(`Periodic update: expected array for ${key}, got ${typeof freshData}`);
|
if (!this.isDataEqual(lastData, data)) {
|
||||||
return;
|
client.emit('metrics-data', { metric, data, requestId });
|
||||||
}
|
this.lastSentData.set(subscriptionKey, data);
|
||||||
|
|
||||||
const lastData = this.lastSentData.get(key);
|
|
||||||
if (!this.isDataEqual(lastData, freshData)) {
|
|
||||||
this.broadcast({
|
|
||||||
event: 'metrics-data',
|
|
||||||
data: {
|
|
||||||
metric: key,
|
|
||||||
data: freshData,
|
|
||||||
type: 'update'
|
|
||||||
}
|
|
||||||
});
|
|
||||||
this.lastSentData.set(key, freshData);
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
filters
|
filters
|
||||||
);
|
);
|
||||||
|
|
||||||
this.metricSubscriptions.set(key, {
|
const cleanup = () => {
|
||||||
stopUpdates,
|
stopUpdates();
|
||||||
clients: new Set([clientId]),
|
this.lastSentData.delete(subscriptionKey);
|
||||||
});
|
client.off('disconnect', cleanup);
|
||||||
|
client.off('unsubscribe-metric', cleanup);
|
||||||
const unsubscribe = () => {
|
|
||||||
this.logger.log(`Unsubscribing client ${clientId} from ${key}`);
|
|
||||||
const subscription = this.metricSubscriptions.get(key);
|
|
||||||
if (subscription) {
|
|
||||||
subscription.clients.delete(clientId);
|
|
||||||
if (subscription.clients.size === 0) {
|
|
||||||
subscription.stopUpdates();
|
|
||||||
this.metricSubscriptions.delete(key);
|
|
||||||
this.lastSentData.delete(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
client.on('close', unsubscribe);
|
client.on('disconnect', cleanup);
|
||||||
client.on('error', unsubscribe);
|
client.on('unsubscribe-metric', cleanup);
|
||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error(`Subscription error for ${key}:`, error);
|
client.emit('metrics-error', {
|
||||||
this.sendError(client, error.message);
|
error: error.message,
|
||||||
|
requestId
|
||||||
if (!this.metricSubscriptions.has(key)) {
|
});
|
||||||
this.lastSentData.delete(key);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private handleUnsubscribeMetric(
|
private getSubscriptionKey(metric: string, filters: Record<string, string>): string {
|
||||||
clientId: string,
|
const filterKeys = Object.keys(filters).sort();
|
||||||
payload: { metric: string; filters?: Filters }
|
const filterString = filterKeys.map(k => `${k}=${encodeURIComponent(filters[k])}`).join('&');
|
||||||
) {
|
|
||||||
const { metric, filters = {} } = payload || {};
|
|
||||||
if (!metric) return;
|
|
||||||
|
|
||||||
const key = this.getSubscriptionKey(metric, filters);
|
|
||||||
const sub = this.metricSubscriptions.get(key);
|
|
||||||
if (!sub) return;
|
|
||||||
|
|
||||||
sub.clients.delete(clientId);
|
|
||||||
if (sub.clients.size === 0) {
|
|
||||||
sub.stopUpdates();
|
|
||||||
this.metricSubscriptions.delete(key);
|
|
||||||
this.lastSentData.delete(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private getQueryParams(rawUrl?: string): Record<string, string> {
|
|
||||||
try {
|
|
||||||
const url = new URL(rawUrl || '', 'http://localhost'); // безопасная база
|
|
||||||
return Object.fromEntries(url.searchParams.entries());
|
|
||||||
} catch {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private getSubscriptionKey(metric: string, filters: Filters): string {
|
|
||||||
const keys = Object.keys(filters).sort();
|
|
||||||
const filterString = keys
|
|
||||||
.map((k) => `${k}=${encodeURIComponent(filters[k])}`)
|
|
||||||
.join('&');
|
|
||||||
return `${metric}${filterString ? `?${filterString}` : ''}`;
|
return `${metric}${filterString ? `?${filterString}` : ''}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
private isDataEqual(a: any[], b: any[]) {
|
// Сравниваем данные, чтобы избежать лишних отправок
|
||||||
if (!a || !b || a.length !== b.length) return false;
|
private isDataEqual(oldData: any[], newData: any[]): boolean {
|
||||||
return a.every((item, i) => {
|
if (!oldData || !newData || oldData.length !== newData.length) return false;
|
||||||
const x = b[i];
|
|
||||||
|
return oldData.every((oldItem, index) => {
|
||||||
|
const newItem = newData[index];
|
||||||
return (
|
return (
|
||||||
item?.value === x?.value &&
|
oldItem.value === newItem.value &&
|
||||||
item?.status === x?.status &&
|
oldItem.status === newItem.status &&
|
||||||
item?.timestamp === x?.timestamp
|
oldItem.timestamp === newItem.timestamp
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private async sendPeriodicUpdates(
|
@SubscribeMessage('subscribe-metric')
|
||||||
|
async handleSubscribeMetric(
|
||||||
|
client: Socket,
|
||||||
|
payload: {
|
||||||
|
metric: string;
|
||||||
|
interval?: number;
|
||||||
|
filters?: Record<string, string>;
|
||||||
|
}
|
||||||
|
) {
|
||||||
|
const { metric, interval = 60000, filters = {} } = payload; // По умолчанию 60 секунд
|
||||||
|
const subscriptionKey = this.getSubscriptionKey(metric, filters);
|
||||||
|
|
||||||
|
if (!this.metricSubscriptions.has(subscriptionKey)) {
|
||||||
|
// Отправляем текущие данные сразу при подписке
|
||||||
|
try {
|
||||||
|
const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||||
|
client.emit('metrics-data', {
|
||||||
|
metric: subscriptionKey,
|
||||||
|
data: initialData
|
||||||
|
});
|
||||||
|
this.lastSentData.set(subscriptionKey, initialData);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(`Error fetching initial data for ${metric}:`, error.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
const stopUpdates = await this.sendPeriodicUpdates(
|
||||||
|
metric,
|
||||||
|
interval,
|
||||||
|
(data) => {
|
||||||
|
// Отправляем только если данные изменились
|
||||||
|
const lastData = this.lastSentData.get(subscriptionKey);
|
||||||
|
if (!this.isDataEqual(lastData, data)) {
|
||||||
|
this.server.emit('metrics-data', {
|
||||||
|
metric: subscriptionKey,
|
||||||
|
data
|
||||||
|
});
|
||||||
|
this.lastSentData.set(subscriptionKey, data);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
filters
|
||||||
|
);
|
||||||
|
|
||||||
|
this.metricSubscriptions.set(subscriptionKey, {
|
||||||
|
stopUpdates,
|
||||||
|
clients: new Set([client.id])
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.metricSubscriptions.get(subscriptionKey)?.clients.add(client.id);
|
||||||
|
// Отправляем кэшированные данные новому клиенту
|
||||||
|
const cachedData = this.lastSentData.get(subscriptionKey);
|
||||||
|
if (cachedData) {
|
||||||
|
client.emit('metrics-data', {
|
||||||
|
metric: subscriptionKey,
|
||||||
|
data: cachedData
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const unsubscribe = () => {
|
||||||
|
const subscription = this.metricSubscriptions.get(subscriptionKey);
|
||||||
|
if (subscription) {
|
||||||
|
subscription.clients.delete(client.id);
|
||||||
|
if (subscription.clients.size === 0) {
|
||||||
|
subscription.stopUpdates();
|
||||||
|
this.metricSubscriptions.delete(subscriptionKey);
|
||||||
|
this.lastSentData.delete(subscriptionKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
client.on('disconnect', unsubscribe);
|
||||||
|
client.on('unsubscribe-metric', unsubscribe);
|
||||||
|
}
|
||||||
|
|
||||||
|
async sendPeriodicUpdates(
|
||||||
metric: string,
|
metric: string,
|
||||||
interval: number,
|
interval: number,
|
||||||
cb: (data: any) => void,
|
callback: (data: any) => void,
|
||||||
filters: Filters
|
filters: Record<string, string> = {}
|
||||||
) {
|
) {
|
||||||
|
// Добавляем небольшую случайную задержку, чтобы избежать пиковой нагрузки
|
||||||
const initialDelay = Math.floor(Math.random() * 5000);
|
const initialDelay = Math.floor(Math.random() * 5000);
|
||||||
await new Promise((r) => setTimeout(r, initialDelay));
|
|
||||||
|
await new Promise(resolve => setTimeout(resolve, initialDelay));
|
||||||
|
|
||||||
const timer = setInterval(async () => {
|
const timer = setInterval(async () => {
|
||||||
try {
|
try {
|
||||||
const data = await this.prometheusService.fetchMetricsWithFilters(
|
const data = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||||
metric,
|
callback(data);
|
||||||
filters
|
} catch (error) {
|
||||||
);
|
this.logger.error(`Error in periodic update for ${metric}:`, error.message);
|
||||||
cb(data);
|
|
||||||
} catch (e: any) {
|
|
||||||
this.logger.error(
|
|
||||||
`Error in periodic update for ${metric}:`,
|
|
||||||
e?.message
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}, interval);
|
}, interval);
|
||||||
|
|
||||||
return () => clearInterval(timer);
|
return () => {
|
||||||
|
clearInterval(timer);
|
||||||
|
this.lastSentData.delete(this.getSubscriptionKey(metric, filters));
|
||||||
|
this.logger.log(`Stopped updates for ${metric}`);
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
}
|
||||||
private sendMessage(client: WebSocket, message: any) {
|
|
||||||
if (client.readyState === WebSocket.OPEN) {
|
|
||||||
client.send(JSON.stringify(message));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private broadcast(message: any) {
|
|
||||||
const raw = JSON.stringify(message);
|
|
||||||
this.wss.clients.forEach((c) => {
|
|
||||||
if (c.readyState === WebSocket.OPEN) c.send(raw);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private sendError(client: WebSocket, error: string, requestId?: string) {
|
|
||||||
this.sendMessage(client, {
|
|
||||||
event: 'metrics-error',
|
|
||||||
data: { error, requestId },
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
Loading…
Reference in New Issue