diff --git a/logs.txt b/logs.txt new file mode 100644 index 0000000..134fcc4 Binary files /dev/null and b/logs.txt differ diff --git a/src/MenuItem.interface.ts b/src/MenuItem.interface.ts deleted file mode 100644 index 102f903..0000000 --- a/src/MenuItem.interface.ts +++ /dev/null @@ -1,10 +0,0 @@ -export interface MenuItem { - id: string; - title: string; - items?: MenuItem[]; - metric?: string; - filters?: { - device: string; - source_id: string; - }; -} \ No newline at end of file diff --git a/src/menu/menu.controller.ts b/src/menu/menu.controller.ts index 197e747..d7b0ea4 100644 --- a/src/menu/menu.controller.ts +++ b/src/menu/menu.controller.ts @@ -1,4 +1,4 @@ -import { Controller, Get, Post, Put, Body, Param, Headers, HttpException, HttpStatus } from '@nestjs/common'; +import { Controller, Get, Post, Put, Body, Param, Headers, HttpException, HttpStatus, Delete } from '@nestjs/common'; import { MenuService } from './menu.service'; import { MenuItem } from './menu.interface'; @@ -44,24 +44,72 @@ export class MenuController { return { hasUpdates }; } - @Post('save') - async saveMenu() { - await this.menuService.saveMenuToFile(); - return { status: 'saved' }; - } + // @Post('save') + // async saveMenu() { + // await this.menuService.saveMenuToFile(); + // return { status: 'saved' }; + // } - @Post('overrides') - async saveOverrides(@Body() data: { overrides: Partial[] }) { - await this.menuService.saveOverrides(data.overrides); - return { status: 'success' }; - } + // @Post('overrides') + // async saveOverrides(@Body() data: { overrides: Partial[] }) { + // await this.menuService.saveOverrides(data.overrides); + // return { status: 'success' }; + // } @Put(':id') async updateMenuItem( @Param('id') id: string, @Body() update: Partial ) { - const updatedItem = await this.menuService.updateMenuItem(id, update); - return updatedItem; + try { + const updatedItem = await this.menuService.updateMenuItem(id, update); + return updatedItem; + } catch (error) { + throw new HttpException( + error.message || 'Failed to update menu item', + HttpStatus.INTERNAL_SERVER_ERROR + ); + } } -} \ No newline at end of file + + @Delete('items/:id') + async deleteMenuItem(@Param('id') id: string) { + console.log(`DELETE /menu/items/${id} requested`); + try { + await this.menuService.hideMenuItem(id); + console.log(`Item ${id} hidden successfully`); + return { success: true, message: 'Item hidden successfully' }; + } catch (error) { + console.error(`Error hiding item ${id}:`, error); + throw new HttpException( + error.message || 'Failed to hide menu item', + HttpStatus.INTERNAL_SERVER_ERROR + ); + } + } + + @Post('invalidate-cache') + async invalidateCache() { + try { + this.menuService.invalidateCache(); + return { success: true, message: 'Cache invalidated successfully' }; + } catch (error) { + throw new HttpException( + error.message || 'Failed to invalidate cache', + HttpStatus.INTERNAL_SERVER_ERROR + ); + } + } + + @Get('debug/overrides') + async debugOverrides() { + try { + return { status: 'Debug endpoint not implemented' }; + } catch (error) { + throw new HttpException( + error.message || 'Debug failed', + HttpStatus.INTERNAL_SERVER_ERROR + ); + } + } +} diff --git a/src/menu/menu.interface.ts b/src/menu/menu.interface.ts index 6608d56..f2b447c 100644 --- a/src/menu/menu.interface.ts +++ b/src/menu/menu.interface.ts @@ -11,4 +11,5 @@ export interface MenuItem { max: number; status: number; }>; + hidden?: boolean; } \ No newline at end of file diff --git a/src/menu/menu-overrides.json b/src/menu/menu.json similarity index 100% rename from src/menu/menu-overrides.json rename to src/menu/menu.json diff --git a/src/menu/menu.service.ts b/src/menu/menu.service.ts index 7d88842..d07ec6c 100644 --- a/src/menu/menu.service.ts +++ b/src/menu/menu.service.ts @@ -10,29 +10,28 @@ export class MenuService { private menuCache: MenuItem | null = null; private lastModified: Date | null = null; private cacheInitialized = false; + private userOverrides: Map> = new Map(); constructor( private readonly prometheusService: PrometheusService, private readonly rangeService: RangeService ) { } - private readonly menuOverridesPath = path.join(process.cwd(), 'data', 'menu.json'); - async saveMenuToFile(): Promise { - const { menu } = await this.getFullMenuWithCache(); - await fs.mkdir(path.dirname(this.menuOverridesPath), { recursive: true }); - await fs.writeFile(this.menuOverridesPath, JSON.stringify(menu, null, 2), 'utf-8'); - } + private readonly userOverridesPath = path.join(process.cwd(), 'data', 'user_menu_overrides.json'); + async getFullMenuWithCache(ifModifiedSince?: string): Promise<{ menu: MenuItem; fresh: boolean }> { if (this.menuCache && this.lastModified && (!ifModifiedSince || new Date(ifModifiedSince) >= this.lastModified)) { return { menu: this.menuCache, fresh: false }; } + await this.loadUserOverrides(); + const dynamicItemsPromise = this.generateDynamicItems(); const baseMenu = await this.injectDynamicItems(this.getStaticStructure(), dynamicItemsPromise); - const overrides = await this.loadOverrides(); - const freshMenu = this.applyOverrides(baseMenu, overrides); + + const freshMenu = this.applyUserOverrides(baseMenu); this.menuCache = freshMenu; this.lastModified = new Date(); @@ -48,33 +47,75 @@ export class MenuService { return !this.lastModified || new Date(ifModifiedSince) < this.lastModified; } - private applyOverrides(menu: MenuItem, overrides: Partial[]): MenuItem { - const overrideMap = new Map(overrides.map(o => [o.id, o])); + async hideMenuItem(id: string): Promise { + this.userOverrides.set(id, { id, hidden: true }); + await this.saveUserOverrides(); + this.invalidateCache(); + } - const apply = (item: MenuItem): MenuItem => { - const override = overrideMap.get(item.id); - const updated = override ? { ...item, ...override } : item; + private applyUserOverrides(menu: MenuItem): MenuItem { + const apply = (item: MenuItem): MenuItem | null => { + const override = this.userOverrides.get(item.id); + + if (override?.hidden) { + return null; + } + + const updated: MenuItem = { + ...item, + ...override, + hidden: undefined + }; if (updated.items) { - updated.items = updated.items.map(apply); + const processedItems = updated.items + .map(apply) + .filter((item): item is MenuItem => item !== null); + + updated.items = processedItems.length > 0 ? processedItems : undefined; } return updated; }; - return apply(menu); + const result = apply(menu); + return result || { title: menu.title, id: menu.id, items: [] }; } - private async loadOverrides(): Promise[]> { + private async loadUserOverrides(): Promise { try { - const content = await fs.readFile(this.menuOverridesPath, 'utf-8'); + const content = await fs.readFile(this.userOverridesPath, 'utf-8'); const parsed = JSON.parse(content); - return parsed.overrides || []; + + this.userOverrides = new Map( + (parsed.overrides || []).map(o => [o.id, o]) + ); } catch (e) { - return []; + this.userOverrides = new Map(); + await this.saveUserOverrides(); // Создаем файл с пустыми данными } } + + private async saveUserOverrides(): Promise { + try { + await fs.mkdir(path.dirname(this.userOverridesPath), { recursive: true }); + const overridesArray = Array.from(this.userOverrides.values()); + await fs.writeFile( + this.userOverridesPath, + JSON.stringify({ overrides: overridesArray }, null, 2), + 'utf-8' + ); + } catch (error) { + console.error('Error saving user overrides:', error); + throw new HttpException( + 'Failed to save user preferences', + HttpStatus.INTERNAL_SERVER_ERROR + ); + } + } + + private getStaticStructure(): MenuItem { return { title: "ЗВКС", @@ -166,9 +207,12 @@ export class MenuService { metadataMap: Map ): Promise { const moduleItems = await this.generateModuleItems(device, seriesData, metadataMap); + + const deviceName = metadataMap.get(device) ?? device; + return { id: `device_${device}`, - title: `Graviton S2082I (${device})`, + title: deviceName, items: moduleItems, isDynamic: true }; @@ -187,41 +231,175 @@ export class MenuService { private normalizeIdPart(part: string): string { return part .replace(/\$/g, '_') - .replace(/[^a-zA-Z0-9-_]/g, ''); + .replace(/,/g, '_') + .replace(/\s+/g, '_') + .replace(/[^a-zA-Z0-9-_]/g, '') + .toLowerCase(); } + + // private async generateModuleItems( + // device: string, + // seriesData: { metric: string; labels: Record }[], + // metadataMap: Map + // ): Promise { + // const modules = new Map(); + + // seriesData.forEach(({ labels }) => { + // if (labels.device === device && labels.source_id) { + // const sourceId = labels.source_id; + // let displayName = sourceId; + + // if (sourceId.startsWith('module$')) { + // displayName = `Module ${sourceId.split('$')[1]}`; + // } else if (sourceId.startsWith('port$')) { + // displayName = `Port ${sourceId.split('$')[1]}`; + // } + + // modules.set(sourceId, displayName); + // } + // }); + + // const modulePromises = Array.from(modules.entries()).map( + // async ([sourceId, displayName]) => ({ + // id: `module_${device}_${sourceId}`, + // title: displayName, + // items: await this.generateMetricItems(device, sourceId, seriesData, metadataMap), + // isDynamic: true + // }) + // ); + + // return Promise.all(modulePromises); + // } + + // private async generateMetricItems( + // device: string, + // module: string, + // seriesData: { metric: string; labels: Record }[], + // metadataMap: Map + // ): Promise { + // const ranges = await this.rangeService.getRanges(); + // const filtered = seriesData.filter( + // ({ labels }) => labels.device === device && labels.source_id === module + // ); + + // const uniqueMetrics = new Set(filtered.map(entry => entry.metric)); + // const safeDevice = this.normalizeIdPart(device); + // const safeModule = this.normalizeIdPart(module); + + // return Array.from(uniqueMetrics).map(metric => { + // const description = metadataMap.get(metric) || metric; + // const safeMetric = this.normalizeIdPart(metric); + // const metricRanges = ranges[description] || []; + + // return { + // id: `metric_${safeDevice}_${safeModule}_${safeMetric}`, + // title: description, + // metric, + // filters: { + // device, + // source_id: module + // }, + // ranges: metricRanges, + // isDynamic: true, + // meta: { + // originalDevice: device, + // originalModule: module + // } + // }; + // }); + // } + + //ВРЕМЕННЫЙ КОСТЫЛЬ + private async generateModuleItems( device: string, seriesData: { metric: string; labels: Record }[], metadataMap: Map ): Promise { const modules = new Map(); + const specialFolders = new Map>(); seriesData.forEach(({ labels }) => { if (labels.device === device && labels.source_id) { - const sourceId = labels.source_id; - let displayName = sourceId; + const sourceId = this.normalizeSourceId(labels.source_id); - if (sourceId.startsWith('module$')) { - displayName = `Module ${sourceId.split('$')[1]}`; - } else if (sourceId.startsWith('port$')) { - displayName = `Port ${sourceId.split('$')[1]}`; + if (sourceId.includes(', complex') || sourceId.includes(', integration')) { + const [modulePart, folderType] = sourceId.split(', ').map(s => s.trim()); + let displayName = modulePart; + + if (modulePart.startsWith('module$')) { + displayName = `Module ${modulePart.split('$')[1]}`; + } else if (modulePart.startsWith('port$')) { + displayName = `Port ${modulePart.split('$')[1]}`; + } else if (modulePart === 'undefined') { + displayName = 'Unknown Module'; + } + + if (!specialFolders.has(folderType)) { + specialFolders.set(folderType, new Map()); + } + specialFolders.get(folderType)!.set(modulePart, displayName); + } + else if (sourceId.endsWith('complex') || sourceId.endsWith('integration')) { + console.warn(`Ignoring legacy format: ${sourceId} for device ${device}`); + } + else { + let displayName = sourceId; + if (sourceId.startsWith('module$')) { + displayName = `Module ${sourceId.split('$')[1]}`; + } else if (sourceId.startsWith('port$')) { + displayName = `Port ${sourceId.split('$')[1]}`; + } + modules.set(sourceId, displayName); } - - modules.set(sourceId, displayName); } }); - const modulePromises = Array.from(modules.entries()).map( + const moduleItems = Array.from(modules.entries()).map( async ([sourceId, displayName]) => ({ - id: `module_${device}_${sourceId}`, + id: `module_${device}_${this.normalizeIdPart(sourceId)}`, title: displayName, items: await this.generateMetricItems(device, sourceId, seriesData, metadataMap), isDynamic: true }) ); - return Promise.all(modulePromises); + const specialFolderItems = Array.from(specialFolders.entries()).map( + async ([folderType, folderModules]) => { + const folderItems = await Promise.all( + Array.from(folderModules.entries()).map( + async ([sourceId, displayName]) => ({ + id: `module_${device}_${this.normalizeIdPart(sourceId)}_${this.normalizeIdPart(folderType)}`, + title: displayName, + items: await this.generateMetricItems( + device, + `${sourceId}, ${folderType}`, + seriesData, + metadataMap + ), + isDynamic: true + }) + ) + ); + + return { + id: `folder_${device}_${this.normalizeIdPart(folderType)}`, + title: folderType, + items: folderItems, + isDynamic: true + }; + } + ); + + return [ + ...(await Promise.all(moduleItems)), + ...(await Promise.all(specialFolderItems)) + ]; + } + + private normalizeSourceId(raw: string): string { + return raw.split(',').map(s => s.trim()).filter(Boolean).join(', '); } private async generateMetricItems( @@ -231,13 +409,33 @@ export class MenuService { metadataMap: Map ): Promise { const ranges = await this.rangeService.getRanges(); - const filtered = seriesData.filter( - ({ labels }) => labels.device === device && labels.source_id === module + + const normModule = this.normalizeSourceId(module); + const isPlainModule = !normModule.includes(','); + + let filtered = seriesData.filter(({ labels }) => + labels.device === device && + this.normalizeSourceId(labels.source_id || '') === normModule ); + if (isPlainModule) { + const base = normModule; + const shadowSuffixes = ['integration', 'complex']; + + filtered = filtered.filter(entry => { + return !shadowSuffixes.some(suffix => + seriesData.some(s => + s.metric === entry.metric && + s.labels.device === device && + this.normalizeSourceId(s.labels.source_id || '') === `${base}, ${suffix}` + ) + ); + }); + } + const uniqueMetrics = new Set(filtered.map(entry => entry.metric)); const safeDevice = this.normalizeIdPart(device); - const safeModule = this.normalizeIdPart(module); + const safeModule = this.normalizeIdPart(normModule); return Array.from(uniqueMetrics).map(metric => { const description = metadataMap.get(metric) || metric; @@ -250,13 +448,13 @@ export class MenuService { metric, filters: { device, - source_id: module + source_id: normModule }, ranges: metricRanges, isDynamic: true, meta: { originalDevice: device, - originalModule: module + originalModule: normModule } }; }); @@ -283,21 +481,20 @@ export class MenuService { } async updateMenuItem(id: string, update: Partial): Promise { - const { menu: fullMenu } = await this.getFullMenuWithCache(); - const item = this.findMenuItem(fullMenu, id); + const existing = this.userOverrides.get(id) || { id }; + this.userOverrides.set(id, { ...existing, ...update }); - if (!item) throw new Error('Menu item not found'); - Object.assign(item, update); + await this.saveUserOverrides(); + this.invalidateCache(); + const { menu } = await this.getFullMenuWithCache(); + const updated = this.findMenuItem(menu, id); - this.menuCache = null; - return item; - } + if (!updated) { + throw new HttpException('Updated item not found', HttpStatus.NOT_FOUND); + } - async saveOverrides(overrides: Partial[]): Promise { - await fs.writeFile(this.menuOverridesPath, JSON.stringify({ overrides }, null, 2), 'utf-8'); - - this.menuCache = null; + return updated; } invalidateCache(): void { diff --git a/src/prometheus/Subscription.interface.ts b/src/prometheus/Subscription.interface.ts deleted file mode 100644 index e69de29..0000000 diff --git a/src/prometheus/index.ts b/src/prometheus/index.ts new file mode 100644 index 0000000..4213d99 --- /dev/null +++ b/src/prometheus/index.ts @@ -0,0 +1,5 @@ +// export * from './prometheus-metric.interface'; +// export * from './prometheus.service'; +// export * from './prometheus-cache.service'; +// export * from './prometheus-query.service'; +// export * from './prometheus-metadata.service'; \ No newline at end of file diff --git a/src/prometheus/metrics.gateway.ts b/src/prometheus/metrics.gateway.ts index 366b7c7..af8e1c7 100644 --- a/src/prometheus/metrics.gateway.ts +++ b/src/prometheus/metrics.gateway.ts @@ -1,23 +1,39 @@ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { Server, WebSocket } from 'ws'; import { createServer } from 'http'; import { PrometheusService } from './prometheus.service'; import { ConfigService } from '@nestjs/config'; +import { PrometheusMetric } from './prometheus-metric.interface'; type Filters = Record; +interface RealtimeSubscription { + clients: Set; + interval: NodeJS.Timeout; + metric: string; + filters: Filters; + lastData: PrometheusMetric[]; +} + +interface HistoricalRequest { + client: WebSocket; + requestId: string; +} + @Injectable() -export class MetricsGateway implements OnModuleInit { +export class MetricsGateway implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(MetricsGateway.name); private wss: Server; + private httpServer: ReturnType; - private activeSockets = new Map(); - private metricSubscriptions = new Map< - string, - { stopUpdates: () => void; clients: Set } - >(); + // Real-time подписки (одна на метрику, много клиентов) + private realtimeSubscriptions = new Map(); - private lastSentData = new Map(); + // Активные клиенты + private activeClients = new Map(); + + // Исторические запросы (для отслеживания) + private historicalRequests = new Map(); constructor( private readonly prometheusService: PrometheusService, @@ -25,367 +41,429 @@ export class MetricsGateway implements OnModuleInit { ) { } onModuleInit() { - const httpServer = createServer(); + this.httpServer = createServer(); this.wss = new Server({ - server: httpServer, + server: this.httpServer, path: '/metrics-ws', }); this.wss.on('connection', (client, request) => this.handleConnection(client, request) ); + this.wss.on('error', (err) => this.logger.error('WebSocket server error:', err) ); const wsPort = Number(this.configService.get('WS_PORT') || 3001); - httpServer.listen(wsPort, () => { + this.httpServer.listen(wsPort, () => { this.logger.log( - `WebSocket server running at ws://localhost:${wsPort}/api/metrics-ws` + `WebSocket server running at ws://localhost:${wsPort}/metrics-ws` ); }); } + onModuleDestroy() { + // Очистка всех ресурсов + this.clearAllSubscriptions(); + this.wss?.close(); + this.httpServer?.close(); + } private handleConnection(client: WebSocket, request: any) { - let clientId = - this.getQueryParams(request?.url).clientId || - Math.random().toString(36).slice(2); + const clientId = this.getClientId(request?.url); + this.activeClients.set(clientId, client); - this.activeSockets.set(clientId, client); this.logger.log(`Client connected: ${clientId}`); + this.logger.debug(`Active clients: ${this.activeClients.size}, Subscriptions: ${this.realtimeSubscriptions.size}`); client.on('message', (raw) => { try { - const msg = JSON.parse(raw.toString()); - this.handleMessage(clientId, client, msg); + const message = JSON.parse(raw.toString()); + this.handleMessage(clientId, client, message); } catch (err) { - this.sendError(client, 'Invalid JSON'); + this.sendError(client, 'Invalid JSON format'); } }); - client.on('close', () => this.cleanupClient(clientId)); + client.on('close', () => this.handleClientDisconnect(clientId)); client.on('error', (err) => { this.logger.error(`Client ${clientId} error:`, err); - this.cleanupClient(clientId); + this.handleClientDisconnect(clientId); + }); + + // Отправляем приветственное сообщение + this.sendMessage(client, { + event: 'connected', + data: { clientId, timestamp: Date.now() } }); } private handleMessage(clientId: string, client: WebSocket, message: any) { - const { event, data } = message || {}; - if (!event) return this.sendError(client, 'Event type is required'); + const { event, data, requestId } = message; + + if (!event) { + return this.sendError(client, 'Event type is required', requestId); + } + + this.logger.debug(`Received event: ${event} from client: ${clientId}`); switch (event) { + case 'subscribe-realtime': + return this.handleSubscribeRealtime(clientId, client, data, requestId); + + case 'unsubscribe-realtime': + return this.handleUnsubscribeRealtime(clientId, data, requestId); + case 'unsubscribe-all': - return this.unsubscribeAllForClient(clientId); + return this.handleUnsubscribeAll(clientId, requestId); - case 'get-metrics': - return this.handleGetMetrics(client, data); + case 'get-historical': + return this.handleGetHistorical(client, data, requestId); - case 'subscribe-metric': - return this.handleSubscribeMetric(clientId, client, data); - - case 'unsubscribe-metric': - return this.handleUnsubscribeMetric(clientId, data); + case 'get-current': + return this.handleGetCurrent(client, data, requestId); default: - return this.sendError(client, `Unknown event type: ${event}`); + return this.sendError(client, `Unknown event type: ${event}`, requestId); } } - private cleanupClient(clientId: string) { - if (!this.activeSockets.has(clientId)) return; - this.logger.log(`Client disconnected: ${clientId}`); - this.activeSockets.delete(clientId); - - setTimeout(() => { - if (!this.activeSockets.has(clientId)) { - } - }, 5000); - - for (const [key, sub] of this.metricSubscriptions) { - sub.clients.delete(clientId); - if (sub.clients.size === 0) { - sub.stopUpdates(); - this.metricSubscriptions.delete(key); - this.lastSentData.delete(key); - } - } - } - - private unsubscribeAllForClient(clientId: string) { - for (const [key, sub] of this.metricSubscriptions) { - if (sub.clients.has(clientId)) sub.clients.delete(clientId); - if (sub.clients.size === 0) { - sub.stopUpdates(); - this.metricSubscriptions.delete(key); - this.lastSentData.delete(key); - } - } - } - - - private async handleGetMetrics(client: WebSocket, payload: any) { - const { - metric, - start, - end, - step, - isRangeQuery, - requestId, - filters = {}, - } = payload || {}; + private async handleSubscribeRealtime( + clientId: string, + client: WebSocket, + payload: any, + requestId?: string + ) { + const { metric, filters = {}, interval = 10000 } = payload || {}; if (!metric) { return this.sendError(client, 'Metric name is required', requestId); } - if (isRangeQuery) { - try { - const rangeData = await this.prometheusService.fetchMetricsRange( - metric, - start, - end, - step, - filters - ); - this.logger.debug('RangeQuery result', JSON.stringify(rangeData).slice(0, 200)); - - return this.sendMessage(client, { - event: 'metrics-data', - data: rangeData, - metric, - requestId, - }); - } catch (err: any) { - return this.sendError(client, err?.message || 'Range query error', requestId); - } - } + const subscriptionKey = this.getSubscriptionKey(metric, filters); try { - const subscriptionKey = this.getSubscriptionKey(metric, filters); + // Если подписка уже существует, просто добавляем клиента + if (this.realtimeSubscriptions.has(subscriptionKey)) { + const subscription = this.realtimeSubscriptions.get(subscriptionKey)!; + subscription.clients.add(clientId); - const initialData = - await this.prometheusService.fetchMetricsWithFilters(metric, filters); + this.logger.log(`Client ${clientId} added to existing subscription: ${subscriptionKey}`); + // Отправляем последние данные клиенту + if (subscription.lastData) { + this.sendMessage(client, { + event: 'realtime-data', + data: { + metric, + filters, + data: subscription.lastData, + type: 'initial' + }, + requestId + }); + } + return; + } + + // Создаем новую подписку + this.logger.log(`Creating new realtime subscription: ${subscriptionKey}`); + + // Первоначальная загрузка данных + const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); + + if (!Array.isArray(initialData)) { + throw new Error(`Expected array for metric ${metric}, got ${typeof initialData}`); + } + + // Создаем интервал для обновлений + const intervalId = setInterval( + () => this.updateRealtimeData(subscriptionKey), + interval + ); + + // Сохраняем подписку + const subscription: RealtimeSubscription = { + clients: new Set([clientId]), + interval: intervalId, + metric, + filters, + lastData: initialData + }; + + this.realtimeSubscriptions.set(subscriptionKey, subscription); + + // Отправляем данные клиенту this.sendMessage(client, { - event: 'metrics-data', - data: { metric, data: initialData, requestId }, + event: 'realtime-data', + data: { + metric, + filters, + data: initialData, + type: 'initial' + }, + requestId }); - let lastLocal = initialData; + this.logger.debug(`Subscription created for ${subscriptionKey} with ${interval}ms interval`); - const jitter = Math.floor(Math.random() * 5000); - setTimeout(() => { - const timer = setInterval(async () => { - try { - const fresh = - await this.prometheusService.fetchMetricsWithFilters( - metric, - filters - ); - if (!this.isDataEqual(lastLocal, fresh)) { - this.sendMessage(client, { - event: 'metrics-data', - data: { metric, data: fresh, requestId }, - }); - lastLocal = fresh; - } - } catch (e) { - this.logger.error( - `Error in on-demand periodic update for ${subscriptionKey}:`, - (e as any)?.message - ); - } - }, step || 5000); - - const closeOnce = () => clearInterval(timer); - client.once('close', closeOnce); - client.once('error', closeOnce); - }, jitter); - } catch (err: any) { - return this.sendError(client, err?.message || 'get-metrics error', requestId); + } catch (error) { + this.logger.error(`Subscribe error for ${subscriptionKey}:`, error); + this.sendError(client, error.message, requestId); } } - private async handleSubscribeMetric(clientId: string, client: WebSocket, payload: any) { - const { metric, interval = 60000, filters = {} } = payload || {}; + private async updateRealtimeData(subscriptionKey: string) { + const subscription = this.realtimeSubscriptions.get(subscriptionKey); + if (!subscription) return; + + try { + const freshData = await this.prometheusService.fetchMetricsWithFilters( + subscription.metric, + subscription.filters + ); + + if (!this.isDataEqual(subscription.lastData, freshData)) { + subscription.lastData = freshData; + + // Рассылаем обновление всем подписанным клиентам + this.broadcastToClients(Array.from(subscription.clients), { + event: 'realtime-data', + data: { + metric: subscription.metric, + filters: subscription.filters, + data: freshData, + type: 'update' + } + }); + + this.logger.debug(`Data updated for subscription: ${subscriptionKey}`); + } + } catch (error) { + this.logger.error(`Update error for ${subscriptionKey}:`, error); + } + } + + private handleUnsubscribeRealtime( + clientId: string, + payload: any, + requestId?: string + ) { + const { metric, filters = {} } = payload || {}; + if (!metric) { - this.sendError(client, 'Metric name is required'); return; } - const key = this.getSubscriptionKey(metric, filters); + const subscriptionKey = this.getSubscriptionKey(metric, filters); + const subscription = this.realtimeSubscriptions.get(subscriptionKey); + + if (!subscription) { + this.logger.debug(`No subscription found for: ${subscriptionKey}`); + return; + } + + // Удаляем клиента из подписки + subscription.clients.delete(clientId); + this.logger.log(`Client ${clientId} unsubscribed from: ${subscriptionKey}`); + + // Если больше нет клиентов, очищаем подписку + if (subscription.clients.size === 0) { + clearInterval(subscription.interval); + this.realtimeSubscriptions.delete(subscriptionKey); + this.logger.log(`Subscription removed: ${subscriptionKey}`); + } + } + + private handleUnsubscribeAll(clientId: string, requestId?: string) { + let unsubscribedCount = 0; + + for (const [key, subscription] of this.realtimeSubscriptions) { + if (subscription.clients.has(clientId)) { + subscription.clients.delete(clientId); + unsubscribedCount++; + + if (subscription.clients.size === 0) { + clearInterval(subscription.interval); + this.realtimeSubscriptions.delete(key); + } + } + } + + this.logger.log(`Client ${clientId} unsubscribed from ${unsubscribedCount} subscriptions`); + } + + private async handleGetHistorical( + client: WebSocket, + payload: any, + requestId?: string + ) { + const { metric, start, end, step = 60, filters = {} } = payload || {}; + + if (!metric) { + return this.sendError(client, 'Metric name is required', requestId); + } + + if (!start || !end) { + return this.sendError(client, 'Start and end time are required', requestId); + } + + const requestKey = `${requestId || Date.now()}-${metric}`; try { - const initial = await this.prometheusService.fetchMetricsWithFilters(metric, filters); + this.logger.debug(`Fetching historical data for: ${metric}, from ${new Date(start).toISOString()} to ${new Date(end).toISOString()}`); - if (!Array.isArray(initial)) { - throw new Error(`Expected array for metric ${metric}, got ${typeof initial}`); - } - - this.sendMessage(client, { - event: 'metrics-data', - data: { - metric: key, - data: initial, - type: 'initial' - } - }); - - this.lastSentData.set(key, initial); - - const stopUpdates = await this.sendPeriodicUpdates( + const historicalData = await this.prometheusService.fetchMetricsRange( metric, - interval, - (freshData) => { - if (!Array.isArray(freshData)) { - this.logger.error(`Periodic update: expected array for ${key}, got ${typeof freshData}`); - return; - } - - const lastData = this.lastSentData.get(key); - if (!this.isDataEqual(lastData, freshData)) { - this.broadcast({ - event: 'metrics-data', - data: { - metric: key, - data: freshData, - type: 'update' - } - }); - this.lastSentData.set(key, freshData); - } - }, + Math.floor(start / 1000), // Convert to seconds + Math.floor(end / 1000), // Convert to seconds + step, filters ); - this.metricSubscriptions.set(key, { - stopUpdates, - clients: new Set([clientId]), + this.sendMessage(client, { + event: 'historical-data', + data: { + metric, + filters, + data: historicalData, + start, + end, + step + }, + requestId }); - const unsubscribe = () => { - this.logger.log(`Unsubscribing client ${clientId} from ${key}`); - const subscription = this.metricSubscriptions.get(key); - if (subscription) { - subscription.clients.delete(clientId); - if (subscription.clients.size === 0) { - subscription.stopUpdates(); - this.metricSubscriptions.delete(key); - this.lastSentData.delete(key); - } - } - }; - - client.on('close', unsubscribe); - client.on('error', unsubscribe); + this.logger.debug(`Historical data sent for: ${metric}, points: ${historicalData.length}`); } catch (error) { - this.logger.error(`Subscription error for ${key}:`, error); - this.sendError(client, error.message); - - if (!this.metricSubscriptions.has(key)) { - this.lastSentData.delete(key); - } + this.logger.error(`Historical data error for ${metric}:`, error); + this.sendError(client, error.message, requestId); } } - private handleUnsubscribeMetric( - clientId: string, - payload: { metric: string; filters?: Filters } + private async handleGetCurrent( + client: WebSocket, + payload: any, + requestId?: string ) { const { metric, filters = {} } = payload || {}; - if (!metric) return; - const key = this.getSubscriptionKey(metric, filters); - const sub = this.metricSubscriptions.get(key); - if (!sub) return; + if (!metric) { + return this.sendError(client, 'Metric name is required', requestId); + } - sub.clients.delete(clientId); - if (sub.clients.size === 0) { - sub.stopUpdates(); - this.metricSubscriptions.delete(key); - this.lastSentData.delete(key); + try { + const currentData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); + + this.sendMessage(client, { + event: 'current-data', + data: { + metric, + filters, + data: currentData, + timestamp: Date.now() + }, + requestId + }); + + } catch (error) { + this.logger.error(`Current data error for ${metric}:`, error); + this.sendError(client, error.message, requestId); } } + private handleClientDisconnect(clientId: string) { + this.logger.log(`Client disconnected: ${clientId}`); - private getQueryParams(rawUrl?: string): Record { + // Удаляем клиента из всех подписок + this.handleUnsubscribeAll(clientId); + + // Удаляем из активных клиентов + this.activeClients.delete(clientId); + + this.logger.debug(`Active clients: ${this.activeClients.size}, Subscriptions: ${this.realtimeSubscriptions.size}`); + } + + private clearAllSubscriptions() { + for (const [key, subscription] of this.realtimeSubscriptions) { + clearInterval(subscription.interval); + } + this.realtimeSubscriptions.clear(); + this.logger.log('All subscriptions cleared'); + } + + private getClientId(url?: string): string { + const params = this.getQueryParams(url); + return params.clientId || `client-${Date.now()}-${Math.random().toString(36).slice(2, 9)}`; + } + + private getQueryParams(url?: string): Record { try { - const url = new URL(rawUrl || '', 'http://localhost'); // безопасная база - return Object.fromEntries(url.searchParams.entries()); + if (!url) return {}; + const urlObj = new URL(url, 'http://localhost'); + return Object.fromEntries(urlObj.searchParams.entries()); } catch { return {}; } } private getSubscriptionKey(metric: string, filters: Filters): string { - const keys = Object.keys(filters).sort(); - const filterString = keys - .map((k) => `${k}=${encodeURIComponent(filters[k])}`) + const sortedFilters = Object.keys(filters) + .sort() + .map(key => `${key}=${encodeURIComponent(filters[key])}`) .join('&'); - return `${metric}${filterString ? `?${filterString}` : ''}`; + + return sortedFilters ? `${metric}?${sortedFilters}` : metric; } - private isDataEqual(a: any[], b: any[]) { - if (!a || !b || a.length !== b.length) return false; - return a.every((item, i) => { - const x = b[i]; + private isDataEqual(a: PrometheusMetric[], b: PrometheusMetric[]): boolean { + if (!Array.isArray(a) || !Array.isArray(b) || a.length !== b.length) { + return false; + } + + return a.every((itemA, index) => { + const itemB = b[index]; return ( - item?.value === x?.value && - item?.status === x?.status && - item?.timestamp === x?.timestamp + itemA.value === itemB.value && + itemA.timestamp === itemB.timestamp && + itemA.device === itemB.device && + itemA.source_id === itemB.source_id ); }); } - private async sendPeriodicUpdates( - metric: string, - interval: number, - cb: (data: any) => void, - filters: Filters - ) { - const initialDelay = Math.floor(Math.random() * 5000); - await new Promise((r) => setTimeout(r, initialDelay)); - - const timer = setInterval(async () => { - try { - const data = await this.prometheusService.fetchMetricsWithFilters( - metric, - filters - ); - cb(data); - } catch (e: any) { - this.logger.error( - `Error in periodic update for ${metric}:`, - e?.message - ); - } - }, interval); - - return () => clearInterval(timer); - } - private sendMessage(client: WebSocket, message: any) { if (client.readyState === WebSocket.OPEN) { - client.send(JSON.stringify(message)); + try { + client.send(JSON.stringify(message)); + } catch (error) { + this.logger.error('Error sending message to client:', error); + } } } - private broadcast(message: any) { - const raw = JSON.stringify(message); - this.wss.clients.forEach((c) => { - if (c.readyState === WebSocket.OPEN) c.send(raw); + private broadcastToClients(clientIds: string[], message: any) { + const messageStr = JSON.stringify(message); + + clientIds.forEach(clientId => { + const client = this.activeClients.get(clientId); + if (client && client.readyState === WebSocket.OPEN) { + try { + client.send(messageStr); + } catch (error) { + this.logger.error(`Error broadcasting to client ${clientId}:`, error); + } + } }); } private sendError(client: WebSocket, error: string, requestId?: string) { this.sendMessage(client, { - event: 'metrics-error', + event: 'error', data: { error, requestId }, + requestId }); } -} - - +} \ No newline at end of file diff --git a/src/prometheus/prometheus-cache.service.ts b/src/prometheus/prometheus-cache.service.ts new file mode 100644 index 0000000..804603d --- /dev/null +++ b/src/prometheus/prometheus-cache.service.ts @@ -0,0 +1,49 @@ +import { Injectable } from '@nestjs/common'; +import { PrometheusMetric } from './prometheus-metric.interface'; + +interface CacheEntry { + data: T; + timestamp: number; +} + +interface MetadataCacheEntry { + type: string | null; + description: string | undefined; + timestamp: number; +} + +@Injectable() +export class PrometheusCacheService { + private metricCache = new Map>(); + private metadataCache = new Map(); + + getMetricCache(key: string): CacheEntry | undefined { + return this.metricCache.get(key); + } + + setMetricCache(key: string, data: PrometheusMetric[], ttl: number = 5000): void { + this.metricCache.set(key, { data, timestamp: Date.now() + ttl }); + } + + getMetadataCache(key: string): MetadataCacheEntry | undefined { + return this.metadataCache.get(key); + } + + setMetadataCache( + key: string, + type: string | null, + description: string | undefined, + ttl: number = 30000 + ): void { + this.metadataCache.set(key, { type, description, timestamp: Date.now() + ttl }); + } + + isCacheValid(cacheEntry: { timestamp: number }): boolean { + return Date.now() < cacheEntry.timestamp; + } + + clearCache(): void { + this.metricCache.clear(); + this.metadataCache.clear(); + } +} \ No newline at end of file diff --git a/src/prometheus/prometheus-query.service.ts b/src/prometheus/prometheus-query.service.ts new file mode 100644 index 0000000..0e91855 --- /dev/null +++ b/src/prometheus/prometheus-query.service.ts @@ -0,0 +1,27 @@ +import { Injectable } from '@nestjs/common'; + +@Injectable() +export class PrometheusQueryService { + buildFilteredQuery(metric: string, filters: Record): string { + const filterParts = Object.entries(filters) + .filter(([_, value]) => value !== undefined && value !== null && value !== "") + .map(([key, value]) => `${key}="${value}"`); + + return filterParts.length > 0 + ? `${metric}{${filterParts.join(',')}}` + : metric; + } + + calculateOptimalStep(start: number, end: number): number { + const duration = end - start; + return Math.max(Math.floor(duration / 1000), 15); + } + + generateCacheKey(metric: string, filters: Record = {}): string { + return `${metric}:${JSON.stringify(filters)}`; + } + + generateMetadataCacheKey(metric: string, type: 'type' | 'description'): string { + return `metadata-${type}-${metric}`; + } +} \ No newline at end of file diff --git a/src/prometheus/prometheus.module.ts b/src/prometheus/prometheus.module.ts index 994b741..5ee8479 100644 --- a/src/prometheus/prometheus.module.ts +++ b/src/prometheus/prometheus.module.ts @@ -1,13 +1,21 @@ import { Module } from '@nestjs/common'; import { HttpModule } from '@nestjs/axios'; +import { ConfigModule } from '@nestjs/config'; import { PrometheusService } from './prometheus.service'; import { MetricsController } from './metrics.controller'; import { MetricsGateway } from './metrics.gateway'; +import { PrometheusCacheService } from './prometheus-cache.service'; +import { PrometheusQueryService } from './prometheus-query.service'; @Module({ - imports: [HttpModule], - providers: [PrometheusService, MetricsGateway], - controllers: [MetricsController], - exports: [PrometheusService] - }) - export class PrometheusModule {} \ No newline at end of file + imports: [HttpModule, ConfigModule], + providers: [ + PrometheusCacheService, + PrometheusQueryService, + PrometheusService, + MetricsGateway + ], + controllers: [MetricsController], + exports: [PrometheusService] +}) +export class PrometheusModule { } \ No newline at end of file diff --git a/src/prometheus/prometheus.service.ts b/src/prometheus/prometheus.service.ts index 396e388..3388095 100644 --- a/src/prometheus/prometheus.service.ts +++ b/src/prometheus/prometheus.service.ts @@ -2,46 +2,54 @@ import { Injectable } from '@nestjs/common'; import { HttpService } from '@nestjs/axios'; import { ConfigService } from '@nestjs/config'; import { lastValueFrom } from 'rxjs'; -import { PrometheusMetric } from './prometheus-metric.interface'; import { MenuItem } from '../menu/menu.interface'; +import { PrometheusMetric } from './prometheus-metric.interface'; +import { PrometheusCacheService } from './prometheus-cache.service'; +import { PrometheusQueryService } from './prometheus-query.service'; + +interface PrometheusResponse { + status: string; + data: any; +} @Injectable() export class PrometheusService { private readonly prometheusUrl: string; - private metricCache = new Map(); - private metadataCache = new Map(); constructor( private readonly httpService: HttpService, - private readonly configService: ConfigService + private readonly configService: ConfigService, + private readonly cacheService: PrometheusCacheService, + private readonly queryService: PrometheusQueryService ) { this.prometheusUrl = this.configService.get('PROMETHEUS_API', 'http://localhost:9090'); console.log('Prometheus API URL:', this.prometheusUrl); } + private async executeQuery(url: string, params: any): Promise { + const response = await lastValueFrom( + this.httpService.get(url, { params }) + ); + return response.data; + } + async fetchMetricType(metric: string): Promise { - const cacheKey = `metadata-type-${metric}`; - const cacheEntry = this.metadataCache.get(cacheKey); - - if (cacheEntry && Date.now() - cacheEntry.timestamp < 30000) { + const cacheKey = this.queryService.generateMetadataCacheKey(metric, 'type'); + const cacheEntry = this.cacheService.getMetadataCache(cacheKey); + + if (cacheEntry && this.cacheService.isCacheValid(cacheEntry)) { return cacheEntry.type; } try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/metadata`, { - params: { metric }, - }) - ); - const metadata = response.data.data[metric]; - const result = metadata?.length ? metadata[0].type : null; - - this.metadataCache.set(cacheKey, { - type: result, - description: cacheEntry?.description, - timestamp: Date.now() + const data = await this.executeQuery(`${this.prometheusUrl}/metadata`, { + params: { metric }, }); - + + const metadata = data.data[metric]; + const result = metadata?.length ? metadata[0].type : null; + + this.cacheService.setMetadataCache(cacheKey, result, cacheEntry?.description); return result; } catch (error) { console.error(`Ошибка при получении типа метрики ${metric}:`, error); @@ -50,28 +58,22 @@ export class PrometheusService { } async fetchMetricDescription(metric: string): Promise { - const cacheKey = `metadata-description-${metric}`; - const cacheEntry = this.metadataCache.get(cacheKey); - - if (cacheEntry && Date.now() - cacheEntry.timestamp < 30000) { + const cacheKey = this.queryService.generateMetadataCacheKey(metric, 'description'); + const cacheEntry = this.cacheService.getMetadataCache(cacheKey); + + if (cacheEntry && this.cacheService.isCacheValid(cacheEntry)) { return cacheEntry.description; } try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/metadata`, { - params: { metric }, - }) - ); - const metadata = response.data.data[metric]; + const data = await this.executeQuery(`${this.prometheusUrl}/metadata`, { + params: { metric }, + }); + + const metadata = data.data[metric]; const result = metadata?.length ? metadata[0].help : undefined; - - this.metadataCache.set(cacheKey, { - type: cacheEntry?.type ?? null, - description: result, - timestamp: Date.now() - }); - + + this.cacheService.setMetadataCache(cacheKey, cacheEntry?.type ?? null, result); return result; } catch (error) { console.error(`Ошибка при получении описания метрики ${metric}:`, error); @@ -79,39 +81,46 @@ export class PrometheusService { } } + private transformMetricData( + entry: any, + metric: string, + type: string | null, + description: string | undefined + ): PrometheusMetric { + return { + __name__: entry.metric.__name__ || metric, + device: entry.metric.device || '', + source_id: entry.metric.source_id || '', + value: parseFloat(entry.value[1]), + timestamp: entry.value[0] * 1000, + type: type || 'gauge', + description + }; + } + async fetchMetrics(metric: string): Promise { - const cacheKey = `${metric}:{}`; - const cacheEntry = this.metricCache.get(cacheKey); - - if (cacheEntry && Date.now() - cacheEntry.timestamp < 5000) { + const cacheKey = this.queryService.generateCacheKey(metric); + const cacheEntry = this.cacheService.getMetricCache(cacheKey); + + if (cacheEntry && this.cacheService.isCacheValid(cacheEntry)) { return cacheEntry.data; } try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/query`, { - params: { query: metric }, - }) + const data = await this.executeQuery(`${this.prometheusUrl}/query`, { + query: metric + }); + + const [type, description] = await Promise.all([ + this.fetchMetricType(metric), + this.fetchMetricDescription(metric) + ]); + + const result = data.data.result.map((entry: any) => + this.transformMetricData(entry, metric, type, description) ); - const metricType = await this.fetchMetricType(metric); - const metricDescription = await this.fetchMetricDescription(metric); - - const result = response.data.data.result.map((entry): PrometheusMetric => ({ - __name__: entry.metric.__name__ || metric, - device: entry.metric.device, - instance: entry.metric.instance, - job: entry.metric.job, - source_id: entry.metric.source_id, - status: entry.metric.status || '0', - timestamp: entry.value[0] * 1000, - value: parseFloat(entry.value[1]), - type: metricType || 'gauge', - description: metricDescription, - ...entry.metric - })); - - this.metricCache.set(cacheKey, { data: result, timestamp: Date.now() }); + this.cacheService.setMetricCache(cacheKey, result); return result; } catch (error) { console.error(`Error fetching metrics for ${metric}:`, error); @@ -121,39 +130,27 @@ export class PrometheusService { } async fetchMetricsWithFilters(metric: string, filters: Record): Promise { - const cacheKey = `${metric}:${JSON.stringify(filters)}`; - const cacheEntry = this.metricCache.get(cacheKey); - - if (cacheEntry && Date.now() - cacheEntry.timestamp < 5000) { + const cacheKey = this.queryService.generateCacheKey(metric, filters); + const cacheEntry = this.cacheService.getMetricCache(cacheKey); + + if (cacheEntry && this.cacheService.isCacheValid(cacheEntry)) { return cacheEntry.data; } try { - const query = this.buildFilteredQuery(metric, filters); - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/query`, { - params: { query } - }) + const query = this.queryService.buildFilteredQuery(metric, filters); + const data = await this.executeQuery(`${this.prometheusUrl}/query`, { query }); + + const [type, description] = await Promise.all([ + this.fetchMetricType(metric), + this.fetchMetricDescription(metric) + ]); + + const result = data.data.result.map((entry: any) => + this.transformMetricData(entry, metric, type, description) ); - const metricType = await this.fetchMetricType(metric); - const metricDescription = await this.fetchMetricDescription(metric); - - const result = response.data.data.result.map((entry): PrometheusMetric => ({ - __name__: entry.metric.__name__ || metric, - device: entry.metric.device, - instance: entry.metric.instance, - job: entry.metric.job, - source_id: entry.metric.source_id, - status: entry.metric.status || '0', - timestamp: entry.value[0] * 1000, - value: parseFloat(entry.value[1]), - type: metricType || 'gauge', - description: metricDescription, - ...entry.metric - })); - - this.metricCache.set(cacheKey, { data: result, timestamp: Date.now() }); + this.cacheService.setMetricCache(cacheKey, result); return result; } catch (error) { console.error(`Error fetching metrics with filters for ${metric}:`, error); @@ -162,56 +159,42 @@ export class PrometheusService { } } - private buildFilteredQuery(metric: string, filters: Record): string { - const filterParts = Object.entries(filters) - .filter(([_, value]) => value !== undefined && value !== null && value !== "") - .map(([key, value]) => { - return `${key}="${value}"`; - }); - - return filterParts.length > 0 - ? `${metric}{${filterParts.join(',')}}` - : metric; - } - - async fetchMetricsRange(metric: string, start: number, end: number, step: number, filters: Record = {}): Promise { - // Рассчитываем оптимальный шаг, если не указан - const duration = end - start; - const optimalStep = Math.max(Math.floor(duration / 1000), 15); // Минимум 15 секунд - - const query = this.buildFilteredQuery(metric, { + async fetchMetricsRange( + metric: string, + start: number, + end: number, + step: number, + filters: Record = {} + ): Promise { + const query = this.queryService.buildFilteredQuery(metric, { ...filters, instance: '192.168.2.34:9050' }); + const optimalStep = this.queryService.calculateOptimalStep(start, end); + try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/query_range`, { - params: { - query, - start, - end, - step: optimalStep.toString() - }, - }) - ); + const data = await this.executeQuery(`${this.prometheusUrl}/query_range`, { + query, + start, + end, + step: optimalStep.toString() + }); - const metricType = await this.fetchMetricType(metric); - const metricDescription = await this.fetchMetricDescription(metric); + const [type, description] = await Promise.all([ + this.fetchMetricType(metric), + this.fetchMetricDescription(metric) + ]); - return response.data.data.result.flatMap((entry) => - entry.values.map((value): PrometheusMetric => ({ + return data.data.result.flatMap((entry: any) => + entry.values.map((value: any) => ({ __name__: entry.metric.__name__ || metric, - device: entry.metric.device, - instance: entry.metric.instance, - job: entry.metric.job, - source_id: entry.metric.source_id, - status: entry.metric.status || '0', - timestamp: value[0] * 1000, + device: entry.metric.device || '', + source_id: entry.metric.source_id || '', value: parseFloat(value[1]), - type: metricType || 'gauge', - description: metricDescription, - ...entry.metric + timestamp: value[0] * 1000, + type: type || 'gauge', + description })) ); } catch (error) { @@ -238,36 +221,30 @@ export class PrometheusService { type?: string; }> { try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/metadata`, { - params: { metric } - }) - ); + const data = await this.executeQuery(`${this.prometheusUrl}/metadata`, { + params: { metric } + }); - const data = response.data?.data?.[metric]?.[0]; + const metadata = data?.data?.[metric]?.[0]; return { name: metric, - help: data?.help, - type: data?.type + help: metadata?.help, + type: metadata?.type }; } catch (error) { console.error(`Error fetching metadata for ${metric}:`, error); - return { - name: metric - }; + return { name: metric }; } } async fetchMetricSeries(metric: string): Promise[]> { try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/series`, { - params: { 'match[]': metric } - }) - ); + const data = await this.executeQuery(`${this.prometheusUrl}/series`, { + 'match[]': metric + }); - return response.data.data || []; + return data.data || []; } catch (error) { console.error(`Error fetching series for ${metric}:`, error); return []; @@ -276,22 +253,20 @@ export class PrometheusService { async fetchAllMetrics(): Promise { try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/label/__name__/values`) - ); - return response.data.data; + const data = await this.executeQuery(`${this.prometheusUrl}/label/__name__/values`, {}); + return data.data; } catch (error) { console.error('Error fetching all metrics:', error); return []; } } - async fetchAllMetricsWithValues(): Promise { + async fetchAllMetricsWithValues(): Promise> { const metricNames = await this.fetchAllMetrics(); - const zvksMetrics = metricNames.filter(metric => - metric.startsWith('zvks') || - metric.includes('server_li') || - metric.includes('application_li') + const zvksMetrics = metricNames.filter(metric => + metric.startsWith('zvks') || + metric.includes('server_li') || + metric.includes('application_li') ); const promises = zvksMetrics.map(async (metric) => { @@ -308,7 +283,6 @@ export class PrometheusService { } clearCache(): void { - this.metricCache.clear(); - this.metadataCache.clear(); + this.cacheService.clearCache(); } } \ No newline at end of file