feat(core): 在非调试模式下自动记录错误信息并保存

This commit is contained in:
XcantloadX 2025-01-22 21:22:32 +08:00
parent 7cc551e6d6
commit 9836f5b1cc
3 changed files with 64 additions and 17 deletions

1
.gitignore vendored
View File

@ -6,6 +6,7 @@ kotonebot-ui/node_modules
kotonebot-ui/.vite
dumps*/
config.json
reports/
##########################
# Byte-compiled / optimized / DLL files

View File

@ -1,19 +1,25 @@
import os
import zipfile
import pkgutil
import logging
import importlib
import traceback
import threading
from datetime import datetime
from typing import Callable, Optional, Any, Literal
from dataclasses import dataclass, field
import cv2
from kotonebot.backend.context import init_context
from kotonebot.backend.core import task_registry, action_registry, Task, Action
from kotonebot.backend.core import task_registry, action_registry, current_callstack, Task, Action
logger = logging.getLogger(__name__)
@dataclass
class TaskStatus:
task: Task
status: Literal['pending', 'running', 'finished', 'error']
status: Literal['pending', 'running', 'finished', 'error', 'cancelled']
@dataclass
class RunStatus:
@ -46,18 +52,51 @@ def initialize(module: str):
logger.info('Tasks and actions initialized.')
logger.info(f'{len(task_registry)} task(s) and {len(action_registry)} action(s) loaded.')
def _save_error_report(
exception: Exception,
*,
path: str | None = None
) -> str:
"""
保存错误报告
:param path: 保存的路径若为 `None`则保存到 `./reports/{YY-MM-DD HH-MM-SS}.zip`
:return: 保存的路径
"""
from kotonebot import device
if path is None:
path = f'./reports/{datetime.now().strftime("%Y-%m-%d %H-%M-%S")}.zip'
exception_msg = '\n'.join(traceback.format_exception(exception))
task_callstack = '\n'.join([f'{i+1}. name={task.name} priority={task.priority}' for i, task in enumerate(current_callstack)])
screenshot = device.screenshot()
with open('config.json', 'r') as f:
config_content = f.read()
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
with zipfile.ZipFile(path, 'w') as zipf:
zipf.writestr('exception.txt', exception_msg)
zipf.writestr('task_callstack.txt', task_callstack)
zipf.writestr('screenshot.png', cv2.imencode('.png', screenshot)[1].tobytes())
zipf.writestr('config.json', config_content)
return path
def run(
*,
debug: bool = False,
resume_on_error: bool = False,
config_type: type = dict[str, Any],
no_try: bool = False,
on_finished: Optional[Callable[[], None]] = None,
on_task_status_changed: Optional[Callable[[Task, Literal['pending', 'running', 'finished', 'error']], None]] = None,
on_task_status_changed: Optional[Callable[[Task, Literal['pending', 'running', 'finished', 'error', 'cancelled']], None]] = None,
on_task_error: Optional[Callable[[Task, Exception], None]] = None,
auto_save_error_report: bool = True,
):
"""
按优先级顺序运行所有任务
:param no_try: 是否不捕获异常
:param debug: 是否为调试模式调试模式下不捕获异常不保存错误报告默认为 `False`
:param resume_on_error: 是否在任务出错时继续运行默认为 `False`
:param auto_save_error_report: 是否自动保存错误报告默认 `True`
"""
init_context(config_type=config_type)
@ -71,18 +110,26 @@ def run(
if on_task_status_changed:
on_task_status_changed(task, 'running')
if no_try:
if debug:
task.func()
else:
try:
task.func()
if on_task_status_changed:
on_task_status_changed(task, 'finished')
except Exception:
except Exception as e:
logger.error(f'Task failed: {task.name}')
logger.exception(f'Error: ')
report_path = None
if auto_save_error_report:
report_path = _save_error_report(e)
if on_task_status_changed:
on_task_status_changed(task, 'error')
if not resume_on_error:
for task1 in tasks[tasks.index(task)+1:]:
if on_task_status_changed:
on_task_status_changed(task1, 'cancelled')
break
logger.info(f'Task finished: {task.name}')
logger.info('All tasks finished.')
if on_finished:
@ -90,7 +137,8 @@ def run(
def start(
*,
no_try: bool = False,
debug: bool = False,
resume_on_error: bool = False,
config_type: type = dict[str, Any],
) -> RunStatus:
run_status = RunStatus(running=True)
@ -106,15 +154,12 @@ def start(
raise ValueError(f'Task {task.name} not found in run_status.tasks')
if status == 'pending':
run_status.tasks.append(TaskStatus(task=task, status='pending'))
elif status == 'running':
_find(task).status = 'running'
elif status == 'finished':
_find(task).status = 'finished'
elif status == 'error':
_find(task).status = 'error'
else:
_find(task).status = status
thread = threading.Thread(target=run, kwargs={
'config_type': config_type,
'no_try': no_try,
'debug': debug,
'resume_on_error': resume_on_error,
'on_finished': _on_finished,
'on_task_status_changed': _on_task_status_changed,
})
@ -128,6 +173,6 @@ if __name__ == '__main__':
logging.getLogger('kotonebot').setLevel(logging.DEBUG)
init_context(config_type=BaseConfig)
initialize('kotonebot.tasks')
run(no_try=True)
run(debug=True)

View File

@ -73,7 +73,8 @@ class KotoneBotUI:
'pending': '等待中',
'running': '运行中',
'finished': '已完成',
'error': '出错'
'error': '出错',
'cancelled': '已取消'
}.get(task_status.status, '未知')
status_list.append([task_status.task.name, status_text])
return status_list