refactor(core): 移除了废弃的 dispatcher 与 action 分发器重载

This commit is contained in:
XcantloadX 2025-07-26 16:57:51 +08:00
parent b51f9cdaa4
commit 09252c5aa1
4 changed files with 34 additions and 179 deletions

View File

@ -1,13 +1,9 @@
import logging
from typing import Callable, ParamSpec, TypeVar, overload, Concatenate, Literal
from typing import Callable, ParamSpec, TypeVar, overload
from dataclasses import dataclass
from typing_extensions import deprecated
import cv2
from cv2.typing import MatLike
from .context import ContextStackVars, ScreenshotMode
from ..dispatch import dispatcher as dispatcher_decorator, DispatcherContext
from ...errors import TaskNotFoundError
P = ParamSpec('P')
@ -98,7 +94,6 @@ def action(
pass_through: bool = False,
priority: int = 0,
screenshot_mode: ScreenshotMode | None = None,
dispatcher: Literal[False] = False,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""
`action` 装饰器用于标记一个函数为动作函数
@ -110,38 +105,6 @@ def action(
如果不想跟踪则设置此参数为 False
:param priority: 动作优先级数字越大优先级越高
:param screenshot_mode: 截图模式
:param dispatcher:
是否为分发器模式默认为假
如果使用分发器则函数的第一个参数必须为 `ctx: DispatcherContext`
"""
...
@overload
@deprecated('使用普通 while 循环代替')
def action(
name: str,
*,
description: str|None = None,
pass_through: bool = False,
priority: int = 0,
screenshot_mode: ScreenshotMode | None = None,
dispatcher: Literal[True, 'fragment'] = True,
) -> Callable[[Callable[Concatenate[DispatcherContext, P], R]], Callable[P, R]]:
"""
`action` 装饰器用于标记一个函数为动作函数
此重载启用了分发器模式被装饰函数的第一个参数必须为 `ctx: DispatcherContext`
:param name: 动作名称如果为 None则使用函数的名称作为名称
:param description: 动作描述如果为 None则使用函数的 docstring 作为描述
:param pass_through:
默认情况下 @action 装饰器会包裹动作函数跟踪其执行情况
如果不想跟踪则设置此参数为 False
:param priority: 动作优先级数字越大优先级越高
:param screenshot_mode: 截图模式必须为 `'manual' / None`
:param dispatcher:
是否为分发器模式默认为假
如果使用分发器则函数的第一个参数必须为 `ctx: DispatcherContext`
"""
...
@ -176,11 +139,6 @@ def action(*args, **kwargs):
pass_through = kwargs.get('pass_through', False)
priority = kwargs.get('priority', 0)
screenshot_mode = kwargs.get('screenshot_mode', None)
dispatcher = kwargs.get('dispatcher', False)
if dispatcher == True or dispatcher == 'fragment':
if not (screenshot_mode is None or screenshot_mode == 'manual'):
raise ValueError('`screenshot_mode` must be None or "manual" when `dispatcher=True`.')
screenshot_mode = 'manual'
def _action_decorator(func: Callable):
nonlocal pass_through
action = _register(_placeholder, name, description)
@ -188,8 +146,6 @@ def action(*args, **kwargs):
if pass_through:
return func
else:
if dispatcher:
func = dispatcher_decorator(func, fragment=(dispatcher == 'fragment')) # type: ignore
def _wrapper(*args: P.args, **kwargs: P.kwargs):
current_callstack.append(action)
vars = ContextStackVars.push(screenshot_mode=screenshot_mode)

View File

@ -1,12 +1,7 @@
import time
import uuid
import logging
import inspect
from logging import Logger
from types import CodeType
from dataclasses import dataclass
from typing import Annotated, Any, Callable, Concatenate, Sequence, TypeVar, ParamSpec, Literal, Protocol, cast
from typing_extensions import deprecated
from typing import Any, Callable, Literal
from dataclasses import dataclass
@ -16,104 +11,6 @@ from kotonebot.primitives import Rect, is_rect
from .core import Image
logger = logging.getLogger(__name__)
P = ParamSpec('P')
R = TypeVar('R')
ThenAction = Literal['click', 'log']
DoAction = Literal['click']
# TODO: 需要找个地方统一管理这些属性名
ATTR_DISPATCHER_MARK = '__kb_dispatcher_mark'
ATTR_ORIGINAL_FUNC = '_kb_inner'
class DispatchFunc: pass
wrapper_to_func: dict[Callable, Callable] = {}
class DispatcherContext:
def __init__(self):
self.finished: bool = False
self._first_run: bool = True
def finish(self):
"""标记已完成 dispatcher 循环。循环将在下次条件检测时退出。"""
self.finished = True
def expand(self, func: Annotated[Callable[[], Any], DispatchFunc], ignore_finish: bool = True):
"""
调用其他 dispatcher 函数
使用 `expand` 和直接调用的区别是
* 直接调用会执行 while 循环直到满足结束条件
* 而使用 `expand` 则只会执行一次效果类似于将目标函数里的代码直接复制粘贴过来
"""
# 获取原始函数
original_func = func
while not getattr(original_func, ATTR_DISPATCHER_MARK, False):
original_func = getattr(original_func, ATTR_ORIGINAL_FUNC)
original_func = getattr(original_func, ATTR_ORIGINAL_FUNC)
if not original_func:
raise ValueError(f'{repr(func)} is not a dispatcher function.')
elif not callable(original_func):
raise ValueError(f'{repr(original_func)} is not callable.')
original_func = cast(Callable[[DispatcherContext], Any], original_func)
old_finished = self.finished
ret = original_func(self)
if ignore_finish:
self.finished = old_finished
return ret
@property
def beginning(self) -> bool:
"""是否为第一次运行"""
return self._first_run
@property
def finishing(self) -> bool:
"""是否即将结束运行"""
return self.finished
@deprecated('使用 SimpleDispatcher 类或 while 循环替代')
def dispatcher(
func: Callable[Concatenate[DispatcherContext, P], R],
*,
fragment: bool = False
) -> Annotated[Callable[P, R], DispatchFunc]:
"""
注意\n
此装饰器必须在应用 @action/@task 装饰器后再应用 `screenshot_mode='manual'` 参数必须设置
或者也可以使用 @action/@task 装饰器中的 `dispatcher=True` 参数
那么就没有上面两个要求了
:param fragment:
片段模式默认不启用
启用后被装饰函数将会只执行依次
而不会一直循环到 ctx.finish() 被调用
"""
def wrapper(*args: P.args, **kwargs: P.kwargs):
ctx = DispatcherContext()
while not ctx.finished:
from kotonebot import device
device.screenshot()
ret = func(ctx, *args, **kwargs)
ctx._first_run = False
return ret
def fragment_wrapper(*args: P.args, **kwargs: P.kwargs):
ctx = DispatcherContext()
from kotonebot import device
device.screenshot()
return func(ctx, *args, **kwargs)
setattr(wrapper, ATTR_ORIGINAL_FUNC, func)
setattr(fragment_wrapper, ATTR_ORIGINAL_FUNC, func)
setattr(wrapper, ATTR_DISPATCHER_MARK, True)
setattr(fragment_wrapper, ATTR_DISPATCHER_MARK, True)
wrapper_to_func[wrapper] = func
if fragment:
return fragment_wrapper
else:
return wrapper
@dataclass
class ClickParams:

View File

@ -8,7 +8,6 @@ from kotonebot.kaa.config import conf, DailyMoneyShopItems
from kotonebot.primitives.geometry import Point
from kotonebot.util import Countdown, cropped
from kotonebot import task, device, image, action, sleep
from kotonebot.backend.dispatch import SimpleDispatcher
from ..actions.scenes import goto_home, goto_shop, at_daily_shop
logger = logging.getLogger(__name__)

View File

@ -12,9 +12,8 @@ from .cards import do_cards, CardDetectResult
from ..actions.commu import handle_unread_commu
from kotonebot.errors import UnrecoverableError
from kotonebot.util import Countdown, Interval, cropped
from kotonebot.backend.dispatch import DispatcherContext
from kotonebot.backend.loop import Loop
from kotonebot.kaa.config import ProduceAction, RecommendCardDetectionMode
from kotonebot.kaa.config import conf
from ..produce.common import until_acquisition_clear, commu_event, fast_acquisitions
from kotonebot import ocr, device, contains, image, regex, action, sleep, wait
from ..produce.non_lesson_actions import (
@ -705,8 +704,8 @@ ProduceStage = Literal[
'unknown', # 未知场景
]
@action('检测当前培育场景', dispatcher=True)
def detect_produce_scene(ctx: DispatcherContext) -> ProduceStage:
@action('检测当前培育场景')
def detect_produce_scene() -> ProduceStage:
"""
判断当前是培育的什么阶段并开始 Regular 培育
@ -715,31 +714,35 @@ def detect_produce_scene(ctx: DispatcherContext) -> ProduceStage:
"""
logger.info("Detecting current produce stage...")
# 行动场景
texts = ocr.ocr()
if (
image.find_multi([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
])
):
logger.info("Detection result: At action scene.")
ctx.finish()
return 'action'
elif texts.where(regex('CLEARまで|PERFECTまで')):
logger.info("Detection result: At practice ongoing.")
ctx.finish()
return 'practice-ongoing'
elif is_exam_scene():
logger.info("Detection result: At exam scene.")
ctx.finish()
return 'exam-ongoing'
else:
if fast_acquisitions():
return 'unknown'
if commu_event():
return 'unknown'
return 'unknown'
for _ in Loop():
# 行动场景
texts = ocr.ocr()
if (
image.find_multi([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
])
):
logger.info("Detection result: At action scene.")
return 'action'
elif texts.where(regex('CLEARまで|PERFECTまで')):
logger.info("Detection result: At practice ongoing.")
return 'practice-ongoing'
elif is_exam_scene():
logger.info("Detection result: At exam scene.")
return 'exam-ongoing'
else:
if fast_acquisitions():
# 继续循环检测
pass
elif commu_event():
# 继续循环检测
pass
else:
return 'unknown'
# 如果没有返回,说明需要继续检测
sleep(0.5) # 等待一段时间再重新检测
return 'unknown'
@action('开始 Hajime 培育')
def hajime_from_stage(stage: ProduceStage, type: Literal['regular', 'pro', 'master'], week: int):