fixed WS
parent
911bfb88d1
commit
421d95565c
|
|
@ -2,16 +2,28 @@ class MetricsService {
|
|||
constructor() {
|
||||
this.baseUrl = '/metrics-ws';
|
||||
this.socket = null;
|
||||
this.subscriptions = new Map();
|
||||
this.pendingRequests = new Map();
|
||||
this.subscriptions = new Map(); // Хранит подписки на real-time данные
|
||||
this.pendingRequests = new Map(); // Для разовых запросов
|
||||
this.reconnectAttempts = 0;
|
||||
this.maxReconnectAttempts = 5;
|
||||
this.reconnectDelay = 5000;
|
||||
this.connectionCallbacks = new Set(); // Колбэки для событий подключения
|
||||
|
||||
window.addEventListener('beforeunload', () => this.cleanupAll());
|
||||
window.addEventListener('pagehide', () => this.cleanupAll());
|
||||
}
|
||||
|
||||
// Новый метод для отслеживания состояния подключения
|
||||
onConnectionChange(callback) {
|
||||
this.connectionCallbacks.add(callback);
|
||||
return () => this.connectionCallbacks.delete(callback);
|
||||
}
|
||||
|
||||
// Уведомление всех подписчиков о изменении состояния
|
||||
notifyConnectionChange(connected) {
|
||||
this.connectionCallbacks.forEach(cb => cb(connected));
|
||||
}
|
||||
|
||||
handleServerMessage(msg) {
|
||||
try {
|
||||
if (!msg || typeof msg !== 'object') {
|
||||
|
|
@ -22,25 +34,25 @@ class MetricsService {
|
|||
const { event, data, requestId } = msg;
|
||||
|
||||
switch (event) {
|
||||
case 'metrics-data':
|
||||
if (requestId && this.pendingRequests.has(requestId)) {
|
||||
const { resolve } = this.pendingRequests.get(requestId);
|
||||
resolve(data);
|
||||
this.pendingRequests.delete(requestId);
|
||||
} else {
|
||||
const metricKey = data.metric;
|
||||
const callbacks = this.subscriptions.get(metricKey) || [];
|
||||
callbacks.forEach(cb => cb(data));
|
||||
}
|
||||
|
||||
case 'connected':
|
||||
console.log('Server connection confirmed:', data);
|
||||
this.notifyConnectionChange(true);
|
||||
break;
|
||||
|
||||
case 'metrics-error':
|
||||
if (requestId && this.pendingRequests.has(requestId)) {
|
||||
const { reject } = this.pendingRequests.get(requestId);
|
||||
reject(new Error(data.error));
|
||||
this.pendingRequests.delete(requestId);
|
||||
}
|
||||
case 'realtime-data':
|
||||
this.handleRealtimeData(data, requestId);
|
||||
break;
|
||||
|
||||
case 'historical-data':
|
||||
this.handleHistoricalData(data, requestId);
|
||||
break;
|
||||
|
||||
case 'current-data':
|
||||
this.handleCurrentData(data, requestId);
|
||||
break;
|
||||
|
||||
case 'error':
|
||||
this.handleError(data, requestId);
|
||||
break;
|
||||
|
||||
default:
|
||||
|
|
@ -51,6 +63,54 @@ class MetricsService {
|
|||
}
|
||||
}
|
||||
|
||||
handleRealtimeData(data, requestId) {
|
||||
const { metric, filters, data: metricsData, type } = data;
|
||||
const metricKey = this.getMetricKey(metric, filters);
|
||||
|
||||
if (requestId && this.pendingRequests.has(requestId)) {
|
||||
// Это ответ на разовый запрос
|
||||
const { resolve } = this.pendingRequests.get(requestId);
|
||||
resolve(metricsData);
|
||||
this.pendingRequests.delete(requestId);
|
||||
} else {
|
||||
// Это обновление по подписке
|
||||
const callbacks = this.subscriptions.get(metricKey) || [];
|
||||
callbacks.forEach(cb => cb({
|
||||
data: metricsData,
|
||||
type: type || 'update',
|
||||
metric,
|
||||
filters,
|
||||
timestamp: Date.now()
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
handleHistoricalData(data, requestId) {
|
||||
if (requestId && this.pendingRequests.has(requestId)) {
|
||||
const { resolve } = this.pendingRequests.get(requestId);
|
||||
resolve(data.data || data);
|
||||
this.pendingRequests.delete(requestId);
|
||||
}
|
||||
}
|
||||
|
||||
handleCurrentData(data, requestId) {
|
||||
if (requestId && this.pendingRequests.has(requestId)) {
|
||||
const { resolve } = this.pendingRequests.get(requestId);
|
||||
resolve(data.data || data);
|
||||
this.pendingRequests.delete(requestId);
|
||||
}
|
||||
}
|
||||
|
||||
handleError(data, requestId) {
|
||||
if (requestId && this.pendingRequests.has(requestId)) {
|
||||
const { reject } = this.pendingRequests.get(requestId);
|
||||
reject(new Error(data.error || 'Unknown error'));
|
||||
this.pendingRequests.delete(requestId);
|
||||
} else {
|
||||
console.error('Server error:', data.error);
|
||||
}
|
||||
}
|
||||
|
||||
connectWebSocket() {
|
||||
if (this.socket && (this.socket.readyState === WebSocket.OPEN || this.socket.readyState === WebSocket.CONNECTING)) {
|
||||
return;
|
||||
|
|
@ -58,25 +118,27 @@ class MetricsService {
|
|||
|
||||
console.log('Connecting WebSocket...');
|
||||
this.socket = new WebSocket(this.baseUrl);
|
||||
this.notifyConnectionChange(false);
|
||||
|
||||
this.socket.addEventListener('open', () => {
|
||||
console.log('WebSocket connected');
|
||||
this.reconnectAttempts = 0;
|
||||
this.subscriptions.forEach((_, metricKey) => {
|
||||
const filters = this.parseFiltersFromKey(metricKey);
|
||||
const [metric] = metricKey.split('?');
|
||||
this.sendMessage('subscribe-metric', { metric, filters });
|
||||
});
|
||||
this.notifyConnectionChange(true);
|
||||
|
||||
// Переподписываемся на все активные подписки
|
||||
this.resubscribeAll();
|
||||
});
|
||||
|
||||
this.socket.addEventListener('close', () => {
|
||||
console.log('WebSocket disconnected');
|
||||
this.socket.addEventListener('close', (event) => {
|
||||
console.log('WebSocket disconnected', event.code, event.reason);
|
||||
this.socket = null;
|
||||
this.notifyConnectionChange(false);
|
||||
this.scheduleReconnect();
|
||||
});
|
||||
|
||||
this.socket.addEventListener('error', (err) => {
|
||||
console.error('WebSocket error:', err);
|
||||
this.notifyConnectionChange(false);
|
||||
});
|
||||
|
||||
this.socket.addEventListener('message', (event) => {
|
||||
|
|
@ -89,6 +151,18 @@ class MetricsService {
|
|||
});
|
||||
}
|
||||
|
||||
// Переподписка на все активные подписки после переподключения
|
||||
resubscribeAll() {
|
||||
this.subscriptions.forEach((_, metricKey) => {
|
||||
const { metric, filters } = this.parseMetricKey(metricKey);
|
||||
this.sendMessage('subscribe-realtime', {
|
||||
metric,
|
||||
filters,
|
||||
interval: 10000 // Дефолтный интервал
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
scheduleReconnect() {
|
||||
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
|
||||
console.warn('Max reconnect attempts reached');
|
||||
|
|
@ -104,12 +178,13 @@ class MetricsService {
|
|||
}, delay);
|
||||
}
|
||||
|
||||
sendMessage(event, data) {
|
||||
sendMessage(event, data, requestId) {
|
||||
if (!this.socket || this.socket.readyState !== WebSocket.OPEN) {
|
||||
if (this.socket && this.socket.readyState === WebSocket.CONNECTING) {
|
||||
// Ждем открытия соединения
|
||||
const waitForOpen = () => {
|
||||
if (this.socket.readyState === WebSocket.OPEN) {
|
||||
this.socket.send(JSON.stringify({ event, data }));
|
||||
this.doSendMessage(event, data, requestId);
|
||||
} else if (this.socket.readyState === WebSocket.CONNECTING) {
|
||||
setTimeout(waitForOpen, 100);
|
||||
}
|
||||
|
|
@ -118,29 +193,77 @@ class MetricsService {
|
|||
} else {
|
||||
console.warn('WebSocket not connected, cannot send:', event);
|
||||
this.connectWebSocket();
|
||||
// Сохраняем сообщение для отправки после подключения
|
||||
setTimeout(() => {
|
||||
if (this.socket?.readyState === WebSocket.OPEN) {
|
||||
this.doSendMessage(event, data, requestId);
|
||||
}
|
||||
}, 1000);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
this.socket.send(JSON.stringify({ event, data }));
|
||||
this.doSendMessage(event, data, requestId);
|
||||
}
|
||||
|
||||
async fetchMetricsRange(metric, start, end, step = 15, filters = {}) {
|
||||
doSendMessage(event, data, requestId) {
|
||||
const message = requestId ? { event, data, requestId } : { event, data };
|
||||
this.socket.send(JSON.stringify(message));
|
||||
}
|
||||
|
||||
// ============ ПУБЛИЧНЫЕ МЕТОДЫ ============
|
||||
|
||||
// Подписка на real-time данные
|
||||
subscribeToMetric(metric, filters = {}, callback, interval = 10000) {
|
||||
this.connectWebSocket();
|
||||
|
||||
const metricKey = this.getMetricKey(metric, filters);
|
||||
|
||||
if (!this.subscriptions.has(metricKey)) {
|
||||
this.subscriptions.set(metricKey, []);
|
||||
|
||||
this.sendMessage('subscribe-realtime', {
|
||||
metric,
|
||||
filters,
|
||||
interval
|
||||
});
|
||||
}
|
||||
|
||||
const callbacks = this.subscriptions.get(metricKey);
|
||||
callbacks.push(callback);
|
||||
|
||||
// Возвращаем функцию для отписки
|
||||
return () => this.unsubscribeFromMetric(metric, filters, callback);
|
||||
}
|
||||
|
||||
// Отписка от real-time данных
|
||||
unsubscribeFromMetric(metric, filters = {}, callback) {
|
||||
const metricKey = this.getMetricKey(metric, filters);
|
||||
const callbacks = this.subscriptions.get(metricKey) || [];
|
||||
const filtered = callbacks.filter(cb => cb !== callback);
|
||||
|
||||
if (filtered.length === 0) {
|
||||
this.subscriptions.delete(metricKey);
|
||||
this.sendMessage('unsubscribe-realtime', { metric, filters });
|
||||
} else {
|
||||
this.subscriptions.set(metricKey, filtered);
|
||||
}
|
||||
}
|
||||
|
||||
// Запрос исторических данных (разовый)
|
||||
async fetchMetricsRange(metric, start, end, step = 60, filters = {}) {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.connectWebSocket();
|
||||
const requestId = `range-${Date.now()}`;
|
||||
const requestId = `historical-${Date.now()}-${Math.random().toString(36).slice(2)}`;
|
||||
|
||||
// Таймаут для очистки
|
||||
const timeout = setTimeout(() => {
|
||||
reject(new Error('Request timeout'));
|
||||
reject(new Error('Historical data request timeout'));
|
||||
this.pendingRequests.delete(requestId);
|
||||
}, 12000);
|
||||
}, 30000); // 30 секунд таймаут для historical данных
|
||||
|
||||
this.pendingRequests.set(requestId, {
|
||||
resolve: (responseData) => {
|
||||
resolve: (data) => {
|
||||
clearTimeout(timeout);
|
||||
const data = Array.isArray(responseData) ? responseData :
|
||||
(responseData?.data || []);
|
||||
resolve(data);
|
||||
},
|
||||
reject: (err) => {
|
||||
|
|
@ -149,64 +272,109 @@ class MetricsService {
|
|||
}
|
||||
});
|
||||
|
||||
this.sendMessage('get-metrics', {
|
||||
metric, start, end, step, filters, isRangeQuery: true, requestId
|
||||
});
|
||||
this.sendMessage('get-historical', {
|
||||
metric,
|
||||
start: Math.floor(start / 1000) * 1000, // Ensure milliseconds
|
||||
end: Math.floor(end / 1000) * 1000,
|
||||
step,
|
||||
filters
|
||||
}, requestId);
|
||||
});
|
||||
}
|
||||
|
||||
subscribeToMetric(metricKey, callback, interval = 5000, filters = {}) {
|
||||
// Запрос текущих данных (разовый)
|
||||
async fetchCurrentMetrics(metric, filters = {}) {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.connectWebSocket();
|
||||
const requestId = `current-${Date.now()}-${Math.random().toString(36).slice(2)}`;
|
||||
|
||||
if (!this.subscriptions.has(metricKey)) {
|
||||
this.subscriptions.set(metricKey, []);
|
||||
const [metric] = metricKey.split('?');
|
||||
this.sendMessage('subscribe-metric', { metric, interval, filters });
|
||||
const timeout = setTimeout(() => {
|
||||
reject(new Error('Current data request timeout'));
|
||||
this.pendingRequests.delete(requestId);
|
||||
}, 10000); // 10 секунд таймаут
|
||||
|
||||
this.pendingRequests.set(requestId, {
|
||||
resolve: (data) => {
|
||||
clearTimeout(timeout);
|
||||
resolve(data);
|
||||
},
|
||||
reject: (err) => {
|
||||
clearTimeout(timeout);
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
this.sendMessage('get-current', {
|
||||
metric,
|
||||
filters
|
||||
}, requestId);
|
||||
});
|
||||
}
|
||||
|
||||
const callbacks = this.subscriptions.get(metricKey);
|
||||
callbacks.push(callback);
|
||||
|
||||
return () => this.unsubscribeFromMetric(metricKey, callback);
|
||||
// Отписка от всех подписок
|
||||
unsubscribeAll() {
|
||||
this.sendMessage('unsubscribe-all', {});
|
||||
this.subscriptions.clear();
|
||||
}
|
||||
|
||||
unsubscribeFromMetric(metricKey, callback) {
|
||||
const callbacks = this.subscriptions.get(metricKey) || [];
|
||||
const filtered = callbacks.filter(cb => cb !== callback);
|
||||
// ============ ВСПОМОГАТЕЛЬНЫЕ МЕТОДЫ ============
|
||||
|
||||
if (filtered.length === 0) {
|
||||
this.subscriptions.delete(metricKey);
|
||||
const [metric] = metricKey.split('?');
|
||||
this.sendMessage('unsubscribe-metric', { metric });
|
||||
} else {
|
||||
this.subscriptions.set(metricKey, filtered);
|
||||
}
|
||||
getMetricKey(metric, filters) {
|
||||
const sortedKeys = Object.keys(filters).sort();
|
||||
const filterString = sortedKeys
|
||||
.map(key => `${key}=${encodeURIComponent(filters[key])}`)
|
||||
.join('&');
|
||||
|
||||
return filterString ? `${metric}?${filterString}` : metric;
|
||||
}
|
||||
|
||||
parseFiltersFromKey(metricKey) {
|
||||
const parts = metricKey.split('?');
|
||||
if (parts.length < 2) return {};
|
||||
return parts[1].split('&').reduce((acc, pair) => {
|
||||
parseMetricKey(metricKey) {
|
||||
const [metric, query] = metricKey.split('?');
|
||||
const filters = {};
|
||||
|
||||
if (query) {
|
||||
query.split('&').forEach(pair => {
|
||||
const [key, value] = pair.split('=');
|
||||
if (key && value) acc[key] = value;
|
||||
return acc;
|
||||
}, {});
|
||||
if (key && value) {
|
||||
filters[decodeURIComponent(key)] = decodeURIComponent(value);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return { metric, filters };
|
||||
}
|
||||
|
||||
cleanupAll() {
|
||||
this.sendMessage('unsubscribe-all', {});
|
||||
this.subscriptions.clear();
|
||||
this.unsubscribeAll();
|
||||
this.disconnectWebSocket();
|
||||
}
|
||||
|
||||
disconnectWebSocket() {
|
||||
if (this.socket) {
|
||||
this.socket.close();
|
||||
this.socket.close(1000, 'Client disconnected');
|
||||
this.socket = null;
|
||||
}
|
||||
this.notifyConnectionChange(false);
|
||||
}
|
||||
|
||||
// Проверка состояния подключения
|
||||
isConnected() {
|
||||
return this.socket?.readyState === WebSocket.OPEN;
|
||||
}
|
||||
|
||||
// Получение текущего состояния
|
||||
getConnectionState() {
|
||||
return this.socket ? this.socket.readyState : WebSocket.CLOSED;
|
||||
}
|
||||
}
|
||||
|
||||
// Создаем глобальный экземпляр
|
||||
const metricsService = new MetricsService();
|
||||
|
||||
// Экспорт для использования в модульной системе
|
||||
export default metricsService;
|
||||
|
||||
// Глобальный экспорт для прямого использования в браузере
|
||||
if (typeof window !== 'undefined') {
|
||||
window.MetricsService = metricsService;
|
||||
}
|
||||
|
|
@ -34,6 +34,8 @@ const PrometheusChart = ({ metricInfo, chartHeight = 580 }) => {
|
|||
const TIME_WINDOW_MS = 3600 * 1000;
|
||||
|
||||
|
||||
// Эта функция может больше не понадобиться, так как
|
||||
// сервис сам генерирует ключи, но оставьте для совместимости
|
||||
const getSubscriptionKey = () => {
|
||||
const filterParts = [];
|
||||
if (device) filterParts.push(`device=${encodeURIComponent(device)}`);
|
||||
|
|
@ -120,10 +122,12 @@ const PrometheusChart = ({ metricInfo, chartHeight = 580 }) => {
|
|||
};
|
||||
|
||||
const step = calculateStep(start, end);
|
||||
|
||||
// Используем новый метод для исторических данных
|
||||
const data = await metricsService.fetchMetricsRange(
|
||||
metricName,
|
||||
Math.floor(start.getTime() / 1000),
|
||||
Math.floor(end.getTime() / 1000),
|
||||
start.getTime(), // Теперь передаем timestamp в миллисекундах
|
||||
end.getTime(),
|
||||
step,
|
||||
extendedFilters
|
||||
);
|
||||
|
|
@ -132,7 +136,7 @@ const PrometheusChart = ({ metricInfo, chartHeight = 580 }) => {
|
|||
.sort((a, b) => a.timestamp - b.timestamp);
|
||||
|
||||
const limitedData = formattedData.length > MAX_POINTS
|
||||
? formattedData.slice(-MAX_POINTS)
|
||||
? downsampleData(formattedData, MAX_POINTS)
|
||||
: formattedData;
|
||||
|
||||
if (limitedData.length > 0) {
|
||||
|
|
@ -163,12 +167,15 @@ const PrometheusChart = ({ metricInfo, chartHeight = 580 }) => {
|
|||
|
||||
fetchHistoricalData(start, end).finally(() => setIsLoading(false));
|
||||
|
||||
// Изменяем параметры подписки
|
||||
return metricsService.subscribeToMetric(
|
||||
getSubscriptionKey(),
|
||||
(newData) => {
|
||||
console.log('Received WS update:', newData);
|
||||
if (!Array.isArray(newData)) {
|
||||
console.error('Expected array in WS update, got:', typeof newData);
|
||||
metricName, // Теперь передаем просто имя метрики
|
||||
{ ...filters, device, source_id }, // Фильры отдельным параметром
|
||||
(update) => { // Колбэк получает объект с данными
|
||||
console.log('Received WS update:', update);
|
||||
|
||||
if (!update || !Array.isArray(update.data)) {
|
||||
console.error('Invalid update format:', update);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -176,7 +183,7 @@ const PrometheusChart = ({ metricInfo, chartHeight = 580 }) => {
|
|||
const now = Date.now();
|
||||
const cutoffTime = now - TIME_WINDOW_MS;
|
||||
|
||||
const formattedNew = formatMetricData(newData)
|
||||
const formattedNew = formatMetricData(update.data)
|
||||
.filter(point => point.timestamp >= cutoffTime);
|
||||
|
||||
const filteredPrev = prev.filter(point =>
|
||||
|
|
@ -194,15 +201,18 @@ const PrometheusChart = ({ metricInfo, chartHeight = 580 }) => {
|
|||
: merged;
|
||||
});
|
||||
},
|
||||
1000,
|
||||
{ ...filters, device, source_id }
|
||||
5000 // Интервал обновления (можно настроить)
|
||||
);
|
||||
};
|
||||
|
||||
|
||||
const stopRealtimeUpdates = () => {
|
||||
setIsLiveUpdating(false);
|
||||
metricsService.unsubscribeFromMetric(getSubscriptionKey());
|
||||
// Теперь отписываемся по метрике и фильтрам
|
||||
metricsService.unsubscribeFromMetric(
|
||||
metricName,
|
||||
{ ...filters, device, source_id }
|
||||
);
|
||||
};
|
||||
|
||||
const handleCustomRangeApply = () => {
|
||||
|
|
@ -215,6 +225,7 @@ const PrometheusChart = ({ metricInfo, chartHeight = 580 }) => {
|
|||
console.log('Metric changed:', { metricName, device, source_id, filters });
|
||||
|
||||
let unsubscribe;
|
||||
|
||||
const init = async () => {
|
||||
if (mode === 'realtime') {
|
||||
unsubscribe = startRealtimeUpdates();
|
||||
|
|
@ -226,10 +237,14 @@ const PrometheusChart = ({ metricInfo, chartHeight = 580 }) => {
|
|||
init();
|
||||
|
||||
return () => {
|
||||
if (unsubscribe) unsubscribe();
|
||||
stopRealtimeUpdates();
|
||||
if (unsubscribe) {
|
||||
unsubscribe(); // Вызываем функцию отписки
|
||||
}
|
||||
if (mode === 'realtime') {
|
||||
stopRealtimeUpdates(); // Дополнительная очистка
|
||||
}
|
||||
};
|
||||
}, [mode, metricName, device, source_id, filters]);
|
||||
}, [mode, metricName, device, source_id, JSON.stringify(filters)]); // Добавляем JSON.stringify для фильтров
|
||||
|
||||
const metaInfo = [
|
||||
metricMeta.instance && `Instance: ${metricMeta.instance}`,
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ export default defineConfig({
|
|||
},
|
||||
'/api': {
|
||||
target: 'http://localhost:3000',
|
||||
ws: true,
|
||||
changeOrigin: true,
|
||||
bypass(req, res, options) {
|
||||
console.log('Proxying request:', req.url);
|
||||
|
|
|
|||
Loading…
Reference in New Issue