refactor(core): 移除原有 kotonebot.run 模块中函数,改用 Kotonebot 类

This commit is contained in:
XcantloadX 2025-03-03 19:39:19 +08:00
parent 7907e448b7
commit 508d81cb3c
8 changed files with 254 additions and 230 deletions

1
.gitignore vendored
View File

@ -12,6 +12,7 @@ reports/
tmp/
res/sprites_compiled/
messages/
logs/
##########################
# Byte-compiled / optimized / DLL files

View File

@ -9,5 +9,7 @@
"test*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
"python.testing.unittestEnabled": true,
"basedpyright.analysis.typeCheckingMode": "basic",
"python.languageServer": "Default"
}

242
kotonebot/backend/bot.py Normal file
View File

@ -0,0 +1,242 @@
import logging
import pkgutil
import importlib
from typing_extensions import Self
from dataclasses import dataclass, field
import threading
import traceback
import os
import zipfile
import cv2
from datetime import datetime
import io
from typing import Any, Literal, Callable, Generic, TypeVar, ParamSpec
from kotonebot.backend.context import Task, Action
from kotonebot.backend.context import init_context, vars
from kotonebot.backend.context import task_registry, action_registry, current_callstack, Task, Action
log_stream = io.StringIO()
stream_handler = logging.StreamHandler(log_stream)
stream_handler.setFormatter(logging.Formatter('[%(asctime)s] [%(levelname)s] [%(name)s] [%(filename)s:%(lineno)d] - %(message)s'))
logging.getLogger('kotonebot').addHandler(stream_handler)
logger = logging.getLogger(__name__)
@dataclass
class TaskStatus:
task: Task
status: Literal['pending', 'running', 'finished', 'error', 'cancelled']
@dataclass
class RunStatus:
running: bool = False
tasks: list[TaskStatus] = field(default_factory=list)
current_task: Task | None = None
callstack: list[Task | Action] = field(default_factory=list)
def interrupt(self):
vars.interrupted.set()
def _save_error_report(
exception: Exception,
*,
path: str | None = None
) -> str:
"""
保存错误报告
:param path: 保存的路径若为 `None`则保存到 `./reports/{YY-MM-DD HH-MM-SS}.zip`
:return: 保存的路径
"""
from kotonebot import device
try:
if path is None:
path = f'./reports/{datetime.now().strftime("%Y-%m-%d %H-%M-%S")}.zip'
exception_msg = '\n'.join(traceback.format_exception(exception))
task_callstack = '\n'.join([f'{i+1}. name={task.name} priority={task.priority}' for i, task in enumerate(current_callstack)])
screenshot = device.screenshot()
logs = log_stream.getvalue()
with open('config.json', 'r', encoding='utf-8') as f:
config_content = f.read()
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
with zipfile.ZipFile(path, 'w') as zipf:
zipf.writestr('exception.txt', exception_msg)
zipf.writestr('task_callstack.txt', task_callstack)
zipf.writestr('screenshot.png', cv2.imencode('.png', screenshot)[1].tobytes())
zipf.writestr('config.json', config_content)
zipf.writestr('logs.txt', logs)
return path
except Exception as e:
logger.exception(f'Failed to save error report:')
return ''
# Modified from https://stackoverflow.com/questions/70982565/how-do-i-make-an-event-listener-with-decorators-in-python
Params = ParamSpec('Params')
Return = TypeVar('Return')
class Event(Generic[Params, Return]):
def __init__(self):
self.__listeners = []
@property
def on(self):
def wrapper(func: Callable[Params, Return]):
self.add_listener(func)
return func
return wrapper
def add_listener(self, func: Callable[Params, Return]) -> None:
if func in self.__listeners:
return
self.__listeners.append(func)
def remove_listener(self, func: Callable[Params, Return]) -> None:
if func not in self.__listeners:
return
self.__listeners.remove(func)
def __iadd__(self, func: Callable[Params, Return]) -> Self:
self.add_listener(func)
return self
def __isub__(self, func: Callable[Params, Return]) -> Self:
self.remove_listener(func)
return self
def trigger(self, *args: Params.args, **kwargs: Params.kwargs) -> None:
for func in self.__listeners:
func(*args, **kwargs)
class KotoneBotEvents:
def __init__(self):
self.task_status_changed = Event[
[Task, Literal['pending', 'running', 'finished', 'error', 'cancelled']], None
]()
self.task_error = Event[
[Task, Exception], None
]()
self.finished = Event[[], None]()
class KotoneBot:
def __init__(
self,
module: str,
config_type: type = dict[str, Any],
*,
debug: bool = False,
resume_on_error: bool = False,
auto_save_error_report: bool = True,
):
"""
初始化 KotoneBot
:param module: 主模块名此模块及其所有子模块都会被载入
:param config_type: 配置类型
:param debug: 调试模式
:param resume_on_error: 在错误时是否恢复
:param auto_save_error_report: 是否自动保存错误报告
"""
self.module = module
self.config_type = config_type
self.debug = debug
self.resume_on_error = resume_on_error
self.auto_save_error_report = auto_save_error_report
self.events = KotoneBotEvents()
def initialize(self):
"""
初始化并载入所有任务和动作
"""
logger.info('Initializing tasks and actions...')
logger.debug(f'Loading module: {self.module}')
# 加载主模块
importlib.import_module(self.module)
# 加载所有子模块
pkg = importlib.import_module(self.module)
for loader, name, is_pkg in pkgutil.walk_packages(pkg.__path__, pkg.__name__ + '.'):
logger.debug(f'Loading sub-module: {name}')
try:
importlib.import_module(name)
except Exception as e:
logger.error(f'Failed to load sub-module: {name}')
logger.exception(f'Error: ')
logger.info('Tasks and actions initialized.')
logger.info(f'{len(task_registry)} task(s) and {len(action_registry)} action(s) loaded.')
def run(self, tasks: list[Task], *, by_priority: bool = True):
"""
按优先级顺序运行所有任务
"""
init_context(config_type=self.config_type)
if by_priority:
tasks = sorted(tasks, key=lambda x: x.priority, reverse=True)
for task in tasks:
self.events.task_status_changed.trigger(task, 'pending')
for task in tasks:
logger.info(f'Task started: {task.name}')
self.events.task_status_changed.trigger(task, 'running')
if self.debug:
task.func()
else:
try:
task.func()
self.events.task_status_changed.trigger(task, 'finished')
# 用户中止
except KeyboardInterrupt as e:
logger.exception('Keyboard interrupt detected.')
for task1 in tasks[tasks.index(task):]:
self.events.task_status_changed.trigger(task1, 'cancelled')
vars.interrupted.clear()
break
# 其他错误
except Exception as e:
logger.error(f'Task failed: {task.name}')
logger.exception(f'Error: ')
report_path = None
if self.auto_save_error_report:
report_path = _save_error_report(e)
self.events.task_status_changed.trigger(task, 'error')
if not self.resume_on_error:
for task1 in tasks[tasks.index(task)+1:]:
self.events.task_status_changed.trigger(task1, 'cancelled')
break
logger.info(f'Task finished: {task.name}')
logger.info('All tasks finished.')
self.events.finished.trigger()
def run_all(self) -> None:
return self.run(list(task_registry.values()), by_priority=True)
def start_all(self) -> RunStatus:
run_status = RunStatus(running=True)
def _on_finished():
run_status.running = False
run_status.current_task = None
run_status.callstack = []
self.events.finished -= _on_finished
self.events.task_status_changed -= _on_task_status_changed
def _on_task_status_changed(task: Task, status: Literal['pending', 'running', 'finished', 'error', 'cancelled']):
def _find(task: Task) -> TaskStatus:
for task_status in run_status.tasks:
if task_status.task == task:
return task_status
raise ValueError(f'Task {task.name} not found in run_status.tasks')
if status == 'pending':
run_status.tasks.append(TaskStatus(task=task, status='pending'))
else:
_find(task).status = status
self.events.task_status_changed += _on_task_status_changed
self.events.finished += _on_finished
thread = threading.Thread(target=self.run_all)
thread.start()
return run_status

View File

@ -45,7 +45,7 @@ class AdbRawImpl(AdbImpl):
self.__process = None
if self.__worker:
try:
self.__worker.join(timeout=2)
self.__worker.join()
except:
pass
self.__worker = None

View File

@ -1,2 +0,0 @@
# kotonebot.run
此文件夹下存放调度脚本,负责执行所有的任务。

View File

@ -1,220 +0,0 @@
import os
import io
import zipfile
import pkgutil
import logging
import importlib
import traceback
import threading
from datetime import datetime
from typing import Callable, Optional, Any, Literal
from dataclasses import dataclass, field
import cv2
from kotonebot.backend.context import init_context, vars
from kotonebot.backend.context import task_registry, action_registry, current_callstack, Task, Action
log_stream = io.StringIO()
stream_handler = logging.StreamHandler(log_stream)
stream_handler.setFormatter(logging.Formatter('[%(asctime)s] [%(levelname)s] [%(name)s] [%(filename)s:%(lineno)d] - %(message)s'))
logging.getLogger('kotonebot').addHandler(stream_handler)
logger = logging.getLogger(__name__)
@dataclass
class TaskStatus:
task: Task
status: Literal['pending', 'running', 'finished', 'error', 'cancelled']
@dataclass
class RunStatus:
running: bool = False
tasks: list[TaskStatus] = field(default_factory=list)
current_task: Task | None = None
callstack: list[Task | Action] = field(default_factory=list)
def interrupt(self):
vars.interrupted.set()
def initialize(module: str):
"""
初始化并载入所有任务和动作
:param module: 主模块名此模块及其所有子模块都会被载入
"""
logger.info('Initializing tasks and actions...')
logger.debug(f'Loading module: {module}')
# 加载主模块
importlib.import_module(module)
# 加载所有子模块
pkg = importlib.import_module(module)
for loader, name, is_pkg in pkgutil.walk_packages(pkg.__path__, pkg.__name__ + '.'):
logger.debug(f'Loading sub-module: {name}')
try:
importlib.import_module(name)
except Exception as e:
logger.error(f'Failed to load sub-module: {name}')
logger.exception(f'Error: ')
logger.info('Tasks and actions initialized.')
logger.info(f'{len(task_registry)} task(s) and {len(action_registry)} action(s) loaded.')
def _save_error_report(
exception: Exception,
*,
path: str | None = None
) -> str:
"""
保存错误报告
:param path: 保存的路径若为 `None`则保存到 `./reports/{YY-MM-DD HH-MM-SS}.zip`
:return: 保存的路径
"""
from kotonebot import device
try:
if path is None:
path = f'./reports/{datetime.now().strftime("%Y-%m-%d %H-%M-%S")}.zip'
exception_msg = '\n'.join(traceback.format_exception(exception))
task_callstack = '\n'.join([f'{i+1}. name={task.name} priority={task.priority}' for i, task in enumerate(current_callstack)])
screenshot = device.screenshot()
logs = log_stream.getvalue()
with open('config.json', 'r', encoding='utf-8') as f:
config_content = f.read()
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
with zipfile.ZipFile(path, 'w') as zipf:
zipf.writestr('exception.txt', exception_msg)
zipf.writestr('task_callstack.txt', task_callstack)
zipf.writestr('screenshot.png', cv2.imencode('.png', screenshot)[1].tobytes())
zipf.writestr('config.json', config_content)
zipf.writestr('logs.txt', logs)
return path
except Exception as e:
logger.exception(f'Failed to save error report:')
return ''
def run(
*,
debug: bool = False,
resume_on_error: bool = False,
config_type: type = dict[str, Any],
on_finished: Optional[Callable[[], None]] = None,
on_task_status_changed: Optional[Callable[[Task, Literal['pending', 'running', 'finished', 'error', 'cancelled']], None]] = None,
on_task_error: Optional[Callable[[Task, Exception], None]] = None,
auto_save_error_report: bool = True,
):
"""
按优先级顺序运行所有任务
:param debug: 是否为调试模式调试模式下不捕获异常不保存错误报告默认为 `False`
:param resume_on_error: 是否在任务出错时继续运行默认为 `False`
:param auto_save_error_report: 是否自动保存错误报告默认 `True`
"""
# TODO: 允许在 initialize 时指定 config_type。
# TODO: 允许 init_context 时先不连接设备,而是可以之后第一次截图时连接
init_context(config_type=config_type)
tasks = sorted(task_registry.values(), key=lambda x: x.priority, reverse=True)
for task in tasks:
if on_task_status_changed:
on_task_status_changed(task, 'pending')
for task in tasks:
logger.info(f'Task started: {task.name}')
if on_task_status_changed:
on_task_status_changed(task, 'running')
if debug:
task.func()
else:
try:
task.func()
if on_task_status_changed:
on_task_status_changed(task, 'finished')
# 用户中止
except KeyboardInterrupt as e:
logger.exception('Keyboard interrupt detected.')
for task1 in tasks[tasks.index(task):]:
if on_task_status_changed:
on_task_status_changed(task1, 'cancelled')
vars.interrupted.clear()
break
# 其他错误
except Exception as e:
logger.error(f'Task failed: {task.name}')
logger.exception(f'Error: ')
report_path = None
if auto_save_error_report:
report_path = _save_error_report(e)
if on_task_status_changed:
on_task_status_changed(task, 'error')
if not resume_on_error:
for task1 in tasks[tasks.index(task)+1:]:
if on_task_status_changed:
on_task_status_changed(task1, 'cancelled')
break
logger.info(f'Task finished: {task.name}')
logger.info('All tasks finished.')
if on_finished:
on_finished()
def start(
*,
debug: bool = False,
resume_on_error: bool = False,
config_type: type = dict[str, Any],
) -> RunStatus:
run_status = RunStatus(running=True)
def _on_finished():
run_status.running = False
run_status.current_task = None
run_status.callstack = []
def _on_task_status_changed(task: Task, status: Literal['pending', 'running', 'finished', 'error']):
def _find(task: Task) -> TaskStatus:
for task_status in run_status.tasks:
if task_status.task == task:
return task_status
raise ValueError(f'Task {task.name} not found in run_status.tasks')
if status == 'pending':
run_status.tasks.append(TaskStatus(task=task, status='pending'))
else:
_find(task).status = status
thread = threading.Thread(target=run, kwargs={
'config_type': config_type,
'debug': debug,
'resume_on_error': resume_on_error,
'on_finished': _on_finished,
'on_task_status_changed': _on_task_status_changed,
})
thread.start()
return run_status
def execute(task: Task, config_type: type = dict[str, Any]):
"""
执行某个任务
:param task: 任务
:param config_type: 配置类型
"""
init_context(config_type=config_type)
initialize('kotonebot.tasks')
task.func()
if __name__ == '__main__':
from kotonebot.tasks.common import BaseConfig
from kotonebot.backend.util import Profiler
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(filename)s:%(lineno)d] - %(message)s')
logger.setLevel(logging.DEBUG)
logging.getLogger('kotonebot').setLevel(logging.DEBUG)
init_context(config_type=BaseConfig)
initialize('kotonebot.tasks')
pf = Profiler('profiler')
pf.begin()
run()
pf.end()
pf.snakeviz()

View File

@ -19,7 +19,7 @@ from kotonebot.tasks.common import (
MissionRewardConfig, PIdol, DailyMoneyShopItems
)
from kotonebot.config.base_config import UserConfig, BackendConfig
from kotonebot.run.run import initialize, start, execute
from kotonebot.backend.bot import KotoneBot
# 初始化日志
os.makedirs('logs', exist_ok=True)
@ -130,12 +130,13 @@ def _save_bug_report(
class KotoneBotUI:
def __init__(self) -> None:
self.is_running: bool = False
self.kaa: KotoneBot = KotoneBot(module='kotonebot.tasks', config_type=BaseConfig)
self._load_config()
self._setup_kaa()
def _setup_kaa(self) -> None:
initialize('kotonebot.tasks')
from kotonebot.backend.debug.vars import debug, clear_saved
self.kaa.initialize()
if self.current_config.keep_screenshots:
debug.auto_save_to_folder = 'dumps'
debug.enabled = True
@ -212,13 +213,13 @@ class KotoneBotUI:
def start_run(self) -> Tuple[str, List[List[str]]]:
self.is_running = True
initialize('kotonebot.tasks')
self.run_status = start(config_type=BaseConfig)
self.run_status = self.kaa.start_all() # 启动所有任务
return "停止", self.update_task_status()
def stop_run(self) -> Tuple[str, List[List[str]]]:
self.is_running = False
self.run_status.interrupt()
if self.kaa:
self.run_status.interrupt() # 中断运行
return "启动", self.update_task_status()
def save_settings(
@ -389,7 +390,7 @@ class KotoneBotUI:
gr.Warning(f"任务 {task_name} 未找到")
return ""
gr.Info(f"任务 {task_name} 开始执行。执行结束前,请勿重复点击执行。")
execute(task, config_type=BaseConfig)
self.kaa.run([task])
gr.Success(f"任务 {task_name} 执行完毕")
return ""