165 lines
6.6 KiB
Python
165 lines
6.6 KiB
Python
"""Модуль для работы с JSON-контейнерами на веб-страницах.
|
||
|
||
Содержит компонент для чтения и проверки JSON-данных в контейнерах.
|
||
Использует playwright для взаимодействия с элементами страницы.
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
from typing import Any, Dict
|
||
import jsondiff
|
||
from playwright.sync_api import Page
|
||
from tools.logger import get_logger
|
||
from components.base_component import BaseComponent
|
||
|
||
logger = get_logger("JSON_CONTAINER")
|
||
|
||
|
||
class JsonContainerComponent(BaseComponent):
|
||
"""Компонент для работы с JSON-данными на странице.
|
||
|
||
Предоставляет методы чтения и проверки JSON-данных в контейнерах.
|
||
"""
|
||
|
||
def __init__(self, page: Page) -> None:
|
||
"""Инициализирует JSON-контейнер.
|
||
|
||
Args:
|
||
page: Экземпляр страницы Playwright.
|
||
"""
|
||
self.page = page
|
||
|
||
def format_json_string(self, json_string: str) -> str:
|
||
"""Форматирует строку JSON для корректного парсинга.
|
||
|
||
Args:
|
||
json_string: Сырая строка с JSON-данными.
|
||
|
||
Returns:
|
||
str: Отформатированная строка JSON.
|
||
"""
|
||
lines = json_string.splitlines()
|
||
formatted_lines = []
|
||
stack = [] # Стек для отслеживания вложенности
|
||
current_indent = 0
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# Определяем тип текущей строки
|
||
if line in ['{', '[']:
|
||
formatted_lines.append(' ' * current_indent + line)
|
||
stack.append(line)
|
||
current_indent += 1
|
||
elif line in ['}', ']']:
|
||
current_indent -= 1
|
||
if stack and stack[-1] in ['{', '[']:
|
||
stack.pop()
|
||
formatted_lines.append(' ' * current_indent + line)
|
||
elif re.match(r'^\d+:\{', line):
|
||
formatted_lines.append(' ' * current_indent + '{')
|
||
stack.append('{')
|
||
current_indent += 1
|
||
elif ':' in line:
|
||
key, value = line.split(':', 1)
|
||
key = key.strip()
|
||
value = value.strip()
|
||
|
||
if not (key.startswith('"') and key.endswith('"')):
|
||
key = f'"{key}"'
|
||
|
||
if value in ['{', '[']:
|
||
formatted_line = f'{key}: {value}'
|
||
formatted_lines.append(' ' * current_indent + formatted_line)
|
||
stack.append(value)
|
||
current_indent += 1
|
||
elif value in ['}', ']']:
|
||
current_indent -= 1
|
||
formatted_line = f'{key}: {value}'
|
||
formatted_lines.append(' ' * current_indent + formatted_line)
|
||
if stack:
|
||
stack.pop()
|
||
else:
|
||
if (value and not value.isdigit() and
|
||
not value.replace('.', '', 1).isdigit() and
|
||
value not in ['true', 'false', 'null'] and
|
||
not value.startswith('"') and not value.endswith('"') and
|
||
not value.startswith('{') and not value.startswith('[')):
|
||
value = f'"{value}"'
|
||
|
||
formatted_line = f'{key}: {value}'
|
||
formatted_lines.append(' ' * current_indent + formatted_line)
|
||
else:
|
||
formatted_lines.append(' ' * current_indent + line)
|
||
|
||
# Добавляем запятые где необходимо
|
||
result = []
|
||
total_lines = len(formatted_lines)
|
||
|
||
for i, current_line in enumerate(formatted_lines):
|
||
if i < total_lines - 1:
|
||
next_line = formatted_lines[i + 1]
|
||
in_array = any(bracket == '[' for bracket in stack)
|
||
|
||
# Упрощенная проверка условий для запятой
|
||
no_comma_condition1 = current_line.endswith(('{', '[', ','))
|
||
no_comma_condition2 = next_line.strip().endswith(('}', ']'))
|
||
no_comma_condition3 = next_line.strip().startswith(('}', ']'))
|
||
no_comma_condition4 = in_array and next_line.strip() == ']'
|
||
|
||
should_add_comma = not (no_comma_condition1 or no_comma_condition2 or
|
||
no_comma_condition3 or no_comma_condition4)
|
||
|
||
# Специальный случай для элементов массива
|
||
if (in_array and current_line.strip() == '}' and
|
||
next_line.strip() != ']' and not next_line.strip().startswith('}')):
|
||
should_add_comma = True
|
||
|
||
if should_add_comma:
|
||
current_line += ','
|
||
|
||
result.append(current_line)
|
||
|
||
return '\n'.join(result)
|
||
|
||
def read_data(self, locator: Any) -> Dict:
|
||
"""Читает и форматирует JSON-данные из указанного локатора.
|
||
|
||
Args:
|
||
locator: Локатор элемента с JSON-данными.
|
||
|
||
Returns:
|
||
dict: Распарсенный JSON-объект.
|
||
|
||
Raises:
|
||
json.JSONDecodeError: Если данные не могут быть преобразованы в JSON.
|
||
"""
|
||
loc = self.get_locator(locator)
|
||
json_string = loc.inner_text()
|
||
formatted_json_string = self.format_json_string(json_string)
|
||
|
||
try:
|
||
data = json.loads(formatted_json_string)
|
||
except json.JSONDecodeError as e:
|
||
logger.error("JSON decode error: %s", e)
|
||
logger.error("Formatted JSON: %s", formatted_json_string)
|
||
assert False, f"Invalid json content. Error: {e}"
|
||
|
||
return data
|
||
|
||
def check_json_equals(self, actual: Any, expected: Any, msg: str) -> None:
|
||
"""Сравнивает JSON-объекты на идентичность.
|
||
|
||
Args:
|
||
actual: Фактический JSON-объект.
|
||
expected: Ожидаемый JSON-объект.
|
||
msg: Сообщение об ошибке.
|
||
|
||
Raises:
|
||
AssertionError: Если объекты не идентичны.
|
||
"""
|
||
diff = jsondiff.diff(expected, actual, syntax='symmetric')
|
||
assert len(diff) == 0, f"{msg}. DIFF is {diff}"
|