Compare commits

...

25 Commits

Author SHA1 Message Date
XcantloadX 524ffd58a9 docs: v2025.7.27.1 更新日志 2025-07-28 20:07:37 +08:00
XcantloadX de1328cdff fix(task): 修复检测培育阶段函数过早返回的问题 2025-07-28 20:06:19 +08:00
XcantloadX 6629bc7ae5 docs: v2025.7.27.0 更新日志 2025-07-27 17:14:52 +08:00
XcantloadX f9fafb9d71 Merge branch 'dev' 2025-07-27 17:13:18 +08:00
XcantloadX 738ec9ee78 fix(task): 修复竞赛总是卡在跳过页面
检测 SKIP 按钮默认给的阈值过高,现在调低了点。
2025-07-27 17:12:43 +08:00
XcantloadX 9c9e4af555 feat(task): 即使执行任务出错也会执行关闭游戏任务
比如关机、休眠等。
2025-07-27 17:12:43 +08:00
XcantloadX 0b7054e897 feat(core): Task 新增 pre、post、regular、manual 四种 run_at 类型 2025-07-27 17:12:43 +08:00
XcantloadX 09252c5aa1 refactor(core): 移除了废弃的 dispatcher 与 action 分发器重载 2025-07-27 17:12:43 +08:00
XcantloadX b51f9cdaa4 feat(task): 优化培育方案错误与选人未找到错误的提示 2025-07-27 17:12:43 +08:00
XcantloadX 3e544e92a9 feat(core): 新增 MessageBox 与 TaskDialog 的 Win32API 封装 2025-07-27 17:12:42 +08:00
XcantloadX 3be8485795 feat(task): 竞赛未编成时支持暂停与通知 2025-07-27 17:12:35 +08:00
XcantloadX a167cbfbe1 feat(core): 支持任务执行中只跳过或停止当前任务 2025-07-26 13:26:01 +08:00
XcantloadX ceaaed7896 fix(task): 修复 DMM 版启动游戏的过程中无法中断的问题 2025-07-26 04:54:19 +08:00
XcantloadX a922ce5738 docs: v2025.7.20.0 更新日志 2025-07-20 10:14:45 +08:00
XcantloadX d7a3494d8e Merge branch 'dev' 2025-07-20 10:12:53 +08:00
XcantloadX b07d4d3d23 fix(task): 修复商店购买中购买推荐商品时点击推荐标签不生效的问题 2025-07-20 10:08:28 +08:00
XcantloadX 4deea1d644 fix(task): 修复 DMM 上由于输出分辨率到日志中导致的启动失败问题
原因是获取分辨率时对于 DMM 版,还没有启动游戏,会抛出找不到窗口的异常。
2025-07-20 09:53:21 +08:00
XcantloadX f929046ae2 feat(ui): 将保留截图数据与跟踪推荐卡两个选项统一移动到调试设置中 2025-07-20 09:48:30 +08:00
XcantloadX 3e67627962 fix(ui): 修复新建培育方案时会自动修改当前选中的方案的问题 2025-07-20 09:05:54 +08:00
XcantloadX 1b385c09b1 fix(ui): 修复删除培育时的报错问题 2025-07-19 07:16:21 +08:00
XcantloadX acfb5548b6 fix(task): 修复严格模式下长时间卡在四张卡的推荐卡检测上 2025-07-19 06:51:03 +08:00
XcantloadX b8ade2f48c feat(ui): 首页快速功能区域新增完成后操作 2025-07-15 07:00:28 +08:00
XcantloadX 16360f5764 docs: v2025.7.13.0 更新日志 2025-07-13 12:12:57 +08:00
XcantloadX a4d3b322e0 Merge branch 'dev' 2025-07-13 12:10:50 +08:00
XcantloadX 4bea42238f fix(ui): 修复某些情况下热重载配置失败的问题
原因是上下文初始化前就调用了 config.load() 导致报错。
2025-07-13 12:06:29 +08:00
24 changed files with 1268 additions and 277 deletions

View File

@ -1,5 +1,49 @@
# 更新日志
## kaa
### v2025.7.27.1
脚本:
* [修复] 修复检测培育阶段函数过早返回的问题(#de1328c
### v2025.7.27.0
脚本:
* [修复] 修复竞赛总是卡在跳过页面(#738ec9e
* [修复] 修复 DMM 版启动游戏的过程中无法中断的问题(#ceaaed7
* [新增] 即使执行任务出错也会执行关闭游戏任务(#9c9e4af
* [新增] 优化培育方案错误与选人未找到错误的提示(#b51f9cd
* [新增] 竞赛未编成时支持暂停与通知(#3be8485
框架:
* [新增] Task 新增 pre、post、regular、manual 四种 run_at 类型(#0b7054e
* [新增] 新增 MessageBox 与 TaskDialog 的 Win32API 封装(#3e544e9
* [新增] 支持任务执行中只跳过或停止当前任务(#a167cbf
* [重构] 移除了废弃的 dispatcher 与 action 分发器重载(#09252c5
### v2025.7.20.0
脚本:
* [修复] 修复商店购买中购买推荐商品时点击推荐标签不生效的问题(#b07d4d3
* [修复] 修复 DMM 上由于输出分辨率到日志中导致的启动失败问题(#4deea1d
* [修复] 修复严格模式下长时间卡在四张卡的推荐卡检测上(#acfb554
界面:
* [修复] 修复新建培育方案时会自动修改当前选中的方案的问题(#3e67627
* [修复] 修复删除培育时的报错问题(#1b385c0
* [新增] 将保留截图数据与跟踪推荐卡两个选项统一移动到调试设置中(#f929046
* [新增] 首页快速功能区域新增完成后操作(#b8ade2f
### v2025.7.13.0
脚本:
* [新增] 上传报告时一并打包配置文件(#a8a5566
界面:
* [修复] 修复某些情况下热重载配置失败的问题(#4bea422
启动器:
* [修复] 修复当文件夹路径存在空格时启动器无法正确启动 kaa 的问题(#63f792d
* [新增] 启动器 EXE 新增多分辨率图标(#5db3ed6
其他:
* [其他] 增加对 Python 信息与分辨率信息的日志输出(#5cc9f45
### v2025.7.9.0
脚本:
* [新增] 配置中支持储存多个培育方案并支持来回切换(#41e7c8b

View File

@ -12,6 +12,15 @@ from kotonebot.client import Device
from kotonebot.client.host.protocol import Instance
from kotonebot.backend.context import init_context, vars
from kotonebot.backend.context import task_registry, action_registry, Task, Action
from kotonebot.errors import StopCurrentTask, UserFriendlyError
from kotonebot.interop.win.task_dialog import TaskDialog
@dataclass
class PostTaskContext:
has_error: bool
exception: Exception | None
log_stream = io.StringIO()
stream_handler = logging.StreamHandler(log_stream)
@ -19,10 +28,11 @@ stream_handler.setFormatter(logging.Formatter('[%(asctime)s] [%(levelname)s] [%(
logging.getLogger('kotonebot').addHandler(stream_handler)
logger = logging.getLogger(__name__)
TaskStatusValue = Literal['pending', 'running', 'finished', 'error', 'cancelled', 'stopped']
@dataclass
class TaskStatus:
task: Task
status: Literal['pending', 'running', 'finished', 'error', 'cancelled']
status: TaskStatusValue
@dataclass
class RunStatus:
@ -73,7 +83,7 @@ class Event(Generic[Params, Return]):
class KotoneBotEvents:
def __init__(self):
self.task_status_changed = Event[
[Task, Literal['pending', 'running', 'finished', 'error', 'cancelled']], None
[Task, TaskStatusValue], None
]()
self.task_error = Event[
[Task, Exception], None
@ -171,42 +181,80 @@ class KotoneBot:
self._on_after_init_context()
vars.flow.clear_interrupt()
pre_tasks = [task for task in tasks if task.run_at == 'pre']
regular_tasks = [task for task in tasks if task.run_at == 'regular']
post_tasks = [task for task in tasks if task.run_at == 'post']
if by_priority:
tasks = sorted(tasks, key=lambda x: x.priority, reverse=True)
for task in tasks:
pre_tasks = sorted(pre_tasks, key=lambda x: x.priority, reverse=True)
regular_tasks = sorted(regular_tasks, key=lambda x: x.priority, reverse=True)
post_tasks = sorted(post_tasks, key=lambda x: x.priority, reverse=True)
all_tasks = pre_tasks + regular_tasks + post_tasks
for task in all_tasks:
self.events.task_status_changed.trigger(task, 'pending')
for task in tasks:
has_error = False
exception: Exception | None = None
for task in all_tasks:
logger.info(f'Task started: {task.name}')
self.events.task_status_changed.trigger(task, 'running')
if self.debug:
task.func()
if task.run_at == 'post':
task.func(PostTaskContext(has_error, exception))
else:
task.func()
else:
try:
task.func()
if task.run_at == 'post':
task.func(PostTaskContext(has_error, exception))
else:
task.func()
self.events.task_status_changed.trigger(task, 'finished')
except StopCurrentTask:
logger.info(f'Task skipped/stopped: {task.name}')
self.events.task_status_changed.trigger(task, 'stopped')
# 用户中止
except KeyboardInterrupt as e:
logger.exception('Keyboard interrupt detected.')
for task1 in tasks[tasks.index(task):]:
for task1 in all_tasks[all_tasks.index(task):]:
self.events.task_status_changed.trigger(task1, 'cancelled')
vars.flow.clear_interrupt()
break
# 用户可以自行处理的错误
except UserFriendlyError as e:
logger.error(f'Task failed: {task.name}')
logger.exception(f'Error: ')
has_error = True
exception = e
dialog = TaskDialog(
title='琴音小助手',
common_buttons=0,
main_instruction='任务执行失败',
content=e.message,
custom_buttons=e.action_buttons,
main_icon='error'
)
result_custom, _, _ = dialog.show()
e.invoke(result_custom)
# 其他错误
except Exception as e:
logger.error(f'Task failed: {task.name}')
logger.exception(f'Error: ')
has_error = True
exception = e
report_path = None
if self.auto_save_error_report:
raise NotImplementedError
self.events.task_status_changed.trigger(task, 'error')
if not self.resume_on_error:
for task1 in tasks[tasks.index(task)+1:]:
for task1 in all_tasks[all_tasks.index(task)+1:]:
self.events.task_status_changed.trigger(task1, 'cancelled')
break
logger.info(f'Task finished: {task.name}')
logger.info('All tasks finished.')
logger.info(f'Task ended: {task.name}')
logger.info('All tasks ended.')
self.events.finished.trigger()
def run_all(self) -> None:
@ -228,7 +276,7 @@ class KotoneBot:
self.events.finished -= _on_finished
self.events.task_status_changed -= _on_task_status_changed
def _on_task_status_changed(task: Task, status: Literal['pending', 'running', 'finished', 'error', 'cancelled']):
def _on_task_status_changed(task: Task, status: TaskStatusValue):
def _find(task: Task) -> TaskStatus:
for task_status in run_status.tasks:
if task_status.task == task:

View File

@ -50,7 +50,7 @@ from kotonebot.backend.ocr import (
from kotonebot.config.manager import load_config, save_config
from kotonebot.config.base_config import UserConfig
from kotonebot.backend.core import Image, HintBox
from kotonebot.errors import KotonebotWarning
from kotonebot.errors import ContextNotInitializedError, KotonebotWarning
from kotonebot.backend.preprocessor import PreprocessorProtocol
from kotonebot.primitives import Rect
@ -719,14 +719,14 @@ class Forwarded:
if name.startswith('_FORWARD_'):
return object.__getattribute__(self, name)
if self._FORWARD_getter is None:
raise ValueError(f"Forwarded object {self._FORWARD_name} called before initialization.")
raise ContextNotInitializedError(f"Forwarded object {self._FORWARD_name} called before initialization.")
return getattr(self._FORWARD_getter(), name)
def __setattr__(self, name: str, value: Any):
if name.startswith('_FORWARD_'):
return object.__setattr__(self, name, value)
if self._FORWARD_getter is None:
raise ValueError(f"Forwarded object {self._FORWARD_name} called before initialization.")
raise ContextNotInitializedError(f"Forwarded object {self._FORWARD_name} called before initialization.")
setattr(self._FORWARD_getter(), name, value)
@ -974,7 +974,7 @@ def inject_context(
):
global _c
if _c is None:
raise RuntimeError('Context not initialized')
raise ContextNotInitializedError('Context not initialized')
_c.inject(device=device, ocr=ocr, image=image, color=color, vars=vars, debug=debug, config=config)
class ManualContextManager:

View File

@ -1,19 +1,18 @@
import logging
from typing import Callable, ParamSpec, TypeVar, overload, Concatenate, Literal
from typing import Callable, ParamSpec, TypeVar, overload, Literal
from dataclasses import dataclass
from typing_extensions import deprecated
import cv2
from cv2.typing import MatLike
from .context import ContextStackVars, ScreenshotMode
from ..dispatch import dispatcher as dispatcher_decorator, DispatcherContext
from ...errors import TaskNotFoundError
P = ParamSpec('P')
R = TypeVar('R')
logger = logging.getLogger(__name__)
TaskRunAtType = Literal['pre', 'post', 'manual', 'regular'] | str
@dataclass
class Task:
name: str
@ -24,6 +23,8 @@ class Task:
"""
任务优先级数字越大优先级越高
"""
run_at: TaskRunAtType = 'regular'
@dataclass
class Action:
@ -51,6 +52,7 @@ def task(
pass_through: bool = False,
priority: int = 0,
screenshot_mode: ScreenshotMode = 'auto',
run_at: TaskRunAtType = 'regular'
):
"""
`task` 装饰器用于标记一个函数为任务函数
@ -62,6 +64,7 @@ def task(
默认情况下 @task 装饰器会包裹任务函数跟踪其执行情况
如果不想跟踪则设置此参数为 False
:param priority: 任务优先级数字越大优先级越高
:param run_at: 任务运行时间
"""
# 设置 ID
# 获取 caller 信息
@ -70,7 +73,7 @@ def task(
description = description or func.__doc__ or ''
# TODO: task_id 冲突检测
task_id = task_id or func.__name__
task = Task(name, task_id, description, _placeholder, priority)
task = Task(name, task_id, description, _placeholder, priority, run_at)
task_registry[name] = task
logger.debug(f'Task "{name}" registered.')
if pass_through:
@ -98,7 +101,6 @@ def action(
pass_through: bool = False,
priority: int = 0,
screenshot_mode: ScreenshotMode | None = None,
dispatcher: Literal[False] = False,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""
`action` 装饰器用于标记一个函数为动作函数
@ -110,38 +112,6 @@ def action(
如果不想跟踪则设置此参数为 False
:param priority: 动作优先级数字越大优先级越高
:param screenshot_mode: 截图模式
:param dispatcher:
是否为分发器模式默认为假
如果使用分发器则函数的第一个参数必须为 `ctx: DispatcherContext`
"""
...
@overload
@deprecated('使用普通 while 循环代替')
def action(
name: str,
*,
description: str|None = None,
pass_through: bool = False,
priority: int = 0,
screenshot_mode: ScreenshotMode | None = None,
dispatcher: Literal[True, 'fragment'] = True,
) -> Callable[[Callable[Concatenate[DispatcherContext, P], R]], Callable[P, R]]:
"""
`action` 装饰器用于标记一个函数为动作函数
此重载启用了分发器模式被装饰函数的第一个参数必须为 `ctx: DispatcherContext`
:param name: 动作名称如果为 None则使用函数的名称作为名称
:param description: 动作描述如果为 None则使用函数的 docstring 作为描述
:param pass_through:
默认情况下 @action 装饰器会包裹动作函数跟踪其执行情况
如果不想跟踪则设置此参数为 False
:param priority: 动作优先级数字越大优先级越高
:param screenshot_mode: 截图模式必须为 `'manual' / None`
:param dispatcher:
是否为分发器模式默认为假
如果使用分发器则函数的第一个参数必须为 `ctx: DispatcherContext`
"""
...
@ -176,11 +146,6 @@ def action(*args, **kwargs):
pass_through = kwargs.get('pass_through', False)
priority = kwargs.get('priority', 0)
screenshot_mode = kwargs.get('screenshot_mode', None)
dispatcher = kwargs.get('dispatcher', False)
if dispatcher == True or dispatcher == 'fragment':
if not (screenshot_mode is None or screenshot_mode == 'manual'):
raise ValueError('`screenshot_mode` must be None or "manual" when `dispatcher=True`.')
screenshot_mode = 'manual'
def _action_decorator(func: Callable):
nonlocal pass_through
action = _register(_placeholder, name, description)
@ -188,8 +153,6 @@ def action(*args, **kwargs):
if pass_through:
return func
else:
if dispatcher:
func = dispatcher_decorator(func, fragment=(dispatcher == 'fragment')) # type: ignore
def _wrapper(*args: P.args, **kwargs: P.kwargs):
current_callstack.append(action)
vars = ContextStackVars.push(screenshot_mode=screenshot_mode)

View File

@ -1,12 +1,7 @@
import time
import uuid
import logging
import inspect
from logging import Logger
from types import CodeType
from dataclasses import dataclass
from typing import Annotated, Any, Callable, Concatenate, Sequence, TypeVar, ParamSpec, Literal, Protocol, cast
from typing_extensions import deprecated
from typing import Any, Callable, Literal
from dataclasses import dataclass
@ -16,104 +11,6 @@ from kotonebot.primitives import Rect, is_rect
from .core import Image
logger = logging.getLogger(__name__)
P = ParamSpec('P')
R = TypeVar('R')
ThenAction = Literal['click', 'log']
DoAction = Literal['click']
# TODO: 需要找个地方统一管理这些属性名
ATTR_DISPATCHER_MARK = '__kb_dispatcher_mark'
ATTR_ORIGINAL_FUNC = '_kb_inner'
class DispatchFunc: pass
wrapper_to_func: dict[Callable, Callable] = {}
class DispatcherContext:
def __init__(self):
self.finished: bool = False
self._first_run: bool = True
def finish(self):
"""标记已完成 dispatcher 循环。循环将在下次条件检测时退出。"""
self.finished = True
def expand(self, func: Annotated[Callable[[], Any], DispatchFunc], ignore_finish: bool = True):
"""
调用其他 dispatcher 函数
使用 `expand` 和直接调用的区别是
* 直接调用会执行 while 循环直到满足结束条件
* 而使用 `expand` 则只会执行一次效果类似于将目标函数里的代码直接复制粘贴过来
"""
# 获取原始函数
original_func = func
while not getattr(original_func, ATTR_DISPATCHER_MARK, False):
original_func = getattr(original_func, ATTR_ORIGINAL_FUNC)
original_func = getattr(original_func, ATTR_ORIGINAL_FUNC)
if not original_func:
raise ValueError(f'{repr(func)} is not a dispatcher function.')
elif not callable(original_func):
raise ValueError(f'{repr(original_func)} is not callable.')
original_func = cast(Callable[[DispatcherContext], Any], original_func)
old_finished = self.finished
ret = original_func(self)
if ignore_finish:
self.finished = old_finished
return ret
@property
def beginning(self) -> bool:
"""是否为第一次运行"""
return self._first_run
@property
def finishing(self) -> bool:
"""是否即将结束运行"""
return self.finished
@deprecated('使用 SimpleDispatcher 类或 while 循环替代')
def dispatcher(
func: Callable[Concatenate[DispatcherContext, P], R],
*,
fragment: bool = False
) -> Annotated[Callable[P, R], DispatchFunc]:
"""
注意\n
此装饰器必须在应用 @action/@task 装饰器后再应用 `screenshot_mode='manual'` 参数必须设置
或者也可以使用 @action/@task 装饰器中的 `dispatcher=True` 参数
那么就没有上面两个要求了
:param fragment:
片段模式默认不启用
启用后被装饰函数将会只执行依次
而不会一直循环到 ctx.finish() 被调用
"""
def wrapper(*args: P.args, **kwargs: P.kwargs):
ctx = DispatcherContext()
while not ctx.finished:
from kotonebot import device
device.screenshot()
ret = func(ctx, *args, **kwargs)
ctx._first_run = False
return ret
def fragment_wrapper(*args: P.args, **kwargs: P.kwargs):
ctx = DispatcherContext()
from kotonebot import device
device.screenshot()
return func(ctx, *args, **kwargs)
setattr(wrapper, ATTR_ORIGINAL_FUNC, func)
setattr(fragment_wrapper, ATTR_ORIGINAL_FUNC, func)
setattr(wrapper, ATTR_DISPATCHER_MARK, True)
setattr(fragment_wrapper, ATTR_DISPATCHER_MARK, True)
wrapper_to_func[wrapper] = func
if fragment:
return fragment_wrapper
else:
return wrapper
@dataclass
class ClickParams:

View File

@ -95,7 +95,7 @@ class FlowController:
logger.info('Interrupt requested.')
self.interrupt_event.set()
def request_pause(self) -> None:
def request_pause(self, *, wait_resume: bool = False) -> None:
"""
请求暂停任务
@ -106,6 +106,8 @@ class FlowController:
if not self.paused:
logger.info('Pause requested.')
self.paused = True
if wait_resume:
self.check()
def request_resume(self) -> None:
"""

View File

@ -1,9 +1,41 @@
from typing import Callable
class KotonebotError(Exception):
pass
class KotonebotWarning(Warning):
pass
class UserFriendlyError(KotonebotError):
def __init__(
self,
message: str,
actions: list[tuple[int, str, Callable[[], None]]] = [],
*args, **kwargs
) -> None:
super().__init__(*args, **kwargs)
self.message = message
self.actions = actions or []
@property
def action_buttons(self) -> list[tuple[int, str]]:
"""
(id: int, btn_text: str) 的形式返回所有按钮定义
"""
return [(id, text) for id, text, _ in self.actions]
def invoke(self, action_id: int):
"""
执行指定 ID action
"""
for id, _, func in self.actions:
if id == action_id:
func()
break
else:
raise ValueError(f'Action with id {action_id} not found.')
class UnrecoverableError(KotonebotError):
pass
@ -30,4 +62,11 @@ class UnscalableResolutionError(KotonebotError):
self.target_resolution = target_resolution
self.screen_size = screen_size
super().__init__(f'Cannot scale to target resolution {target_resolution}. '
f'Screen size: {screen_size}')
f'Screen size: {screen_size}')
class ContextNotInitializedError(KotonebotError):
def __init__(self, msg: str = 'Context not initialized'):
super().__init__(msg)
class StopCurrentTask(KotonebotError):
pass

View File

@ -0,0 +1,314 @@
import ctypes
from typing import Optional, Literal, List, overload
from typing_extensions import assert_never
# 按钮常量
MB_OK = 0x00000000
MB_OKCANCEL = 0x00000001
MB_ABORTRETRYIGNORE = 0x00000002
MB_YESNOCANCEL = 0x00000003
MB_YESNO = 0x00000004
MB_RETRYCANCEL = 0x00000005
MB_CANCELTRYCONTINUE = 0x00000006
# 图标常量
MB_ICONSTOP = 0x00000010
MB_ICONERROR = 0x00000010
MB_ICONQUESTION = 0x00000020
MB_ICONWARNING = 0x00000030
MB_ICONINFORMATION = 0x00000040
# 默认按钮常量
MB_DEFBUTTON1 = 0x00000000
MB_DEFBUTTON2 = 0x00000100
MB_DEFBUTTON3 = 0x00000200
MB_DEFBUTTON4 = 0x00000300
# 模态常量
MB_APPLMODAL = 0x00000000
MB_SYSTEMMODAL = 0x00001000
MB_TASKMODAL = 0x00002000
# 其他选项
MB_HELP = 0x00004000
MB_NOFOCUS = 0x00008000
MB_SETFOREGROUND = 0x00010000
MB_DEFAULT_DESKTOP_ONLY = 0x00020000
MB_TOPMOST = 0x00040000
MB_RIGHT = 0x00080000
MB_RTLREADING = 0x00100000
MB_SERVICE_NOTIFICATION = 0x00200000
# 返回值常量
IDOK = 1
IDCANCEL = 2
IDABORT = 3
IDRETRY = 4
IDIGNORE = 5
IDYES = 6
IDNO = 7
IDCLOSE = 8
IDHELP = 9
IDTRYAGAIN = 10
IDCONTINUE = 11
# 为清晰起见,定义类型别名
ButtonsType = Literal['ok', 'ok_cancel', 'abort_retry_ignore', 'yes_no_cancel', 'yes_no', 'retry_cancel', 'cancel_try_continue']
IconType = Optional[Literal['stop', 'error', 'question', 'warning', 'information']]
DefaultButtonType = Literal['button1', 'button2', 'button3', 'button4']
ModalType = Literal['application', 'system', 'task']
OptionsType = Optional[List[Literal['help', 'no_focus', 'set_foreground', 'default_desktop_only', 'topmost', 'right', 'rtl_reading', 'service_notification']]]
ReturnType = Literal['ok', 'cancel', 'abort', 'retry', 'ignore', 'yes', 'no', 'close', 'help', 'try_again', 'continue']
user32 = ctypes.windll.user32
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['ok'] = 'ok',
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['ok']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['ok_cancel'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['ok', 'cancel']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['abort_retry_ignore'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['abort', 'retry', 'ignore']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['yes_no_cancel'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['yes', 'no', 'cancel']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['yes_no'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['yes', 'no']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['retry_cancel'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['retry', 'cancel']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['cancel_try_continue'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['cancel', 'try_again', 'continue']: ...
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: ButtonsType = 'ok',
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> ReturnType:
"""
显示消息框
:param hWnd: 所属窗口的句柄可以为 None
:param text: 要显示的消息
:param caption: 消息框的标题
:param buttons: 要显示的按钮
:param icon: 要显示的图标
:param default_button: 默认按钮
:param modal: 消息框的模态
:param options: 其他杂项选项列表
:return: 表示用户点击的按钮的字符串
"""
uType = 0
# --- 按钮类型 ---
match buttons:
case 'ok':
uType |= MB_OK
case 'ok_cancel':
uType |= MB_OKCANCEL
case 'abort_retry_ignore':
uType |= MB_ABORTRETRYIGNORE
case 'yes_no_cancel':
uType |= MB_YESNOCANCEL
case 'yes_no':
uType |= MB_YESNO
case 'retry_cancel':
uType |= MB_RETRYCANCEL
case 'cancel_try_continue':
uType |= MB_CANCELTRYCONTINUE
case _:
assert_never(buttons)
# --- 图标类型 ---
if icon:
match icon:
case 'stop' | 'error':
uType |= MB_ICONSTOP
case 'question':
uType |= MB_ICONQUESTION
case 'warning':
uType |= MB_ICONWARNING
case 'information':
uType |= MB_ICONINFORMATION
case _:
assert_never(icon)
# --- 默认按钮 ---
match default_button:
case 'button1':
uType |= MB_DEFBUTTON1
case 'button2':
uType |= MB_DEFBUTTON2
case 'button3':
uType |= MB_DEFBUTTON3
case 'button4':
uType |= MB_DEFBUTTON4
case _:
assert_never(default_button)
# --- 模态 ---
match modal:
case 'application':
uType |= MB_APPLMODAL
case 'system':
uType |= MB_SYSTEMMODAL
case 'task':
uType |= MB_TASKMODAL
case _:
assert_never(modal)
# --- 其他选项 ---
if options:
for option in options:
match option:
case 'help':
uType |= MB_HELP
case 'no_focus':
uType |= MB_NOFOCUS
case 'set_foreground':
uType |= MB_SETFOREGROUND
case 'default_desktop_only':
uType |= MB_DEFAULT_DESKTOP_ONLY
case 'topmost':
uType |= MB_TOPMOST
case 'right':
uType |= MB_RIGHT
case 'rtl_reading':
uType |= MB_RTLREADING
case 'service_notification':
uType |= MB_SERVICE_NOTIFICATION
case _:
assert_never(option)
result = user32.MessageBoxW(hWnd, text, caption, uType)
match result:
case 1: # IDOK
return 'ok'
case 2: # IDCANCEL
return 'cancel'
case 3: # IDABORT
return 'abort'
case 4: # IDRETRY
return 'retry'
case 5: # IDIGNORE
return 'ignore'
case 6: # IDYES
return 'yes'
case 7: # IDNO
return 'no'
case 8: # IDCLOSE
return 'close'
case 9: # IDHELP
return 'help'
case 10: # IDTRYAGAIN
return 'try_again'
case 11: # IDCONTINUE
return 'continue'
case _:
# 对于标准消息框,不应发生这种情况
raise RuntimeError(f"Unknown MessageBox return code: {result}")
if __name__ == '__main__':
# 示例用法
response = message_box(
None,
"是否要退出程序?",
"确认",
buttons='yes_no',
icon='question'
)
if response == 'yes':
print("程序退出。")
else:
print("程序继续运行。")
message_box(
None,
"操作已完成。",
"通知",
buttons='ok',
icon='information'
)

View File

@ -0,0 +1,469 @@
import ctypes
from ctypes import wintypes
import time
from typing import List, Tuple, Optional
from typing import Literal
__all__ = [
"TaskDialog",
"TDCBF_OK_BUTTON", "TDCBF_YES_BUTTON", "TDCBF_NO_BUTTON", "TDCBF_CANCEL_BUTTON",
"TDCBF_RETRY_BUTTON", "TDCBF_CLOSE_BUTTON",
"IDOK", "IDCANCEL", "IDABORT", "IDRETRY", "IDIGNORE", "IDYES", "IDNO", "IDCLOSE",
"TD_WARNING_ICON", "TD_ERROR_ICON", "TD_INFORMATION_ICON", "TD_SHIELD_ICON"
]
# --- Windows API 常量定义 ---
# 常用按钮
TDCBF_OK_BUTTON = 0x0001
TDCBF_YES_BUTTON = 0x0002
TDCBF_NO_BUTTON = 0x0004
TDCBF_CANCEL_BUTTON = 0x0008
TDCBF_RETRY_BUTTON = 0x0010
TDCBF_CLOSE_BUTTON = 0x0020
# 对话框返回值
IDOK = 1
IDCANCEL = 2
IDABORT = 3
IDRETRY = 4
IDIGNORE = 5
IDYES = 6
IDNO = 7
IDCLOSE = 8
# 标准图标 (使用 MAKEINTRESOURCE 宏)
def MAKEINTRESOURCE(i: int) -> wintypes.LPWSTR:
return wintypes.LPWSTR(i)
TD_WARNING_ICON = MAKEINTRESOURCE(65535)
TD_ERROR_ICON = MAKEINTRESOURCE(65534)
TD_INFORMATION_ICON = MAKEINTRESOURCE(65533)
TD_SHIELD_ICON = MAKEINTRESOURCE(65532)
# Task Dialog 标志
TDF_ENABLE_HYPERLINKS = 0x0001
TDF_USE_HICON_MAIN = 0x0002
TDF_USE_HICON_FOOTER = 0x0004
TDF_ALLOW_DIALOG_CANCELLATION = 0x0008
TDF_USE_COMMAND_LINKS = 0x0010
TDF_USE_COMMAND_LINKS_NO_ICON = 0x0020
TDF_EXPAND_FOOTER_AREA = 0x0040
TDF_EXPANDED_BY_DEFAULT = 0x0080
TDF_VERIFICATION_FLAG_CHECKED = 0x0100
TDF_SHOW_PROGRESS_BAR = 0x0200
TDF_SHOW_MARQUEE_PROGRESS_BAR = 0x0400
TDF_CALLBACK_TIMER = 0x0800
TDF_POSITION_RELATIVE_TO_WINDOW = 0x1000
TDF_RTL_LAYOUT = 0x2000
TDF_NO_DEFAULT_RADIO_BUTTON = 0x4000
TDF_CAN_BE_MINIMIZED = 0x8000
# Task Dialog 通知
TDN_CREATED = 0
TDN_NAVIGATED = 1
TDN_BUTTON_CLICKED = 2
TDN_HYPERLINK_CLICKED = 3
TDN_TIMER = 4
TDN_DESTROYED = 5
TDN_RADIO_BUTTON_CLICKED = 6
TDN_DIALOG_CONSTRUCTED = 7
TDN_VERIFICATION_CLICKED = 8
TDN_HELP = 9
TDN_EXPANDO_BUTTON_CLICKED = 10
# Windows 消息
WM_USER = 0x0400
TDM_SET_PROGRESS_BAR_POS = WM_USER + 114
CommonButtonLiteral = Literal["ok", "yes", "no", "cancel", "retry", "close"]
IconLiteral = Literal["warning", "error", "information", "shield"]
# --- C 结构体定义 (使用 ctypes) ---
class TASKDIALOG_BUTTON(ctypes.Structure):
_pack_ = 1
_fields_ = [("nButtonID", ctypes.c_int),
("pszButtonText", wintypes.LPCWSTR)]
# 定义回调函数指针原型
PFTASKDIALOGCALLBACK = ctypes.WINFUNCTYPE(
ctypes.HRESULT, # 返回值
wintypes.HWND, # hwnd
ctypes.c_uint, # msg
ctypes.c_size_t, # wParam
ctypes.c_size_t, # lParam
ctypes.c_ssize_t # lpRefData
)
class TASKDIALOGCONFIG(ctypes.Structure):
_pack_ = 1
_fields_ = [
("cbSize", ctypes.c_uint),
("hwndParent", wintypes.HWND),
("hInstance", wintypes.HINSTANCE),
("dwFlags", ctypes.c_uint),
("dwCommonButtons", ctypes.c_uint),
("pszWindowTitle", wintypes.LPCWSTR),
("pszMainIcon", wintypes.LPCWSTR),
("pszMainInstruction", wintypes.LPCWSTR),
("pszContent", wintypes.LPCWSTR),
("cButtons", ctypes.c_uint),
("pButtons", ctypes.POINTER(TASKDIALOG_BUTTON)),
("nDefaultButton", ctypes.c_int),
("cRadioButtons", ctypes.c_uint),
("pRadioButtons", ctypes.POINTER(TASKDIALOG_BUTTON)),
("nDefaultRadioButton", ctypes.c_int),
("pszVerificationText", wintypes.LPCWSTR),
("pszExpandedInformation", wintypes.LPCWSTR),
("pszExpandedControlText", wintypes.LPCWSTR),
("pszCollapsedControlText", wintypes.LPCWSTR),
("pszFooterIcon", wintypes.LPCWSTR),
("pszFooter", wintypes.LPCWSTR),
("pfCallback", PFTASKDIALOGCALLBACK), # 使用定义好的原型
("lpCallbackData", ctypes.c_ssize_t),
("cxWidth", ctypes.c_uint)
]
# --- 加载 comctl32.dll 并定义函数原型 ---
comctl32 = ctypes.WinDLL('comctl32')
user32 = ctypes.WinDLL('user32')
TaskDialogIndirect = comctl32.TaskDialogIndirect
TaskDialogIndirect.restype = ctypes.HRESULT
TaskDialogIndirect.argtypes = [
ctypes.POINTER(TASKDIALOGCONFIG),
ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(wintypes.BOOL)
]
# --- Python 封装类 ---
class TaskDialog:
"""
一个用于显示 Windows TaskDialog Python 封装类
支持自定义按钮单选按钮进度条验证框等
"""
def __init__(self,
parent_hwnd: Optional[int] = None,
title: str = "Task Dialog",
main_instruction: str = "",
content: str = "",
common_buttons: int | List[CommonButtonLiteral] = TDCBF_OK_BUTTON,
main_icon: Optional[wintypes.LPWSTR | int | IconLiteral] = None,
footer: str = "",
custom_buttons: Optional[List[Tuple[int, str]]] = None,
default_button: int = 0,
radio_buttons: Optional[List[Tuple[int, str]]] = None,
default_radio_button: int = 0,
verification_text: Optional[str] = None,
verification_checked_by_default: bool = False,
show_progress_bar: bool = False,
show_marquee_progress_bar: bool = False
):
"""初始化 TaskDialog 实例。
:param parent_hwnd: 父窗口的句柄
:param title: 对话框窗口的标题
:param main_instruction: 对话框的主要指令文本
:param content: 对话框的详细内容文本
:param common_buttons: 要显示的通用按钮可以是以下两种形式之一
1. TDCBF_* 常量的按位或组合 (例如 TDCBF_OK_BUTTON | TDCBF_CANCEL_BUTTON)
2. 字符串列表支持 "ok", "yes", "no", "cancel", "retry", "close"
:param main_icon: 主图标可以是以下几种形式之一
1. TD_*_ICON 常量之一
2. HICON 句柄
3. 字符串"warning", "error", "information", "shield"
:param footer: 页脚区域显示的文本
:param custom_buttons: 自定义按钮列表每个元组包含 (按钮ID, 按钮文本)
:param default_button: 默认按钮的ID可以是通用按钮ID (例如 IDOK) 或自定义按钮ID
:param radio_buttons: 单选按钮列表每个元组包含 (按钮ID, 按钮文本)
:param default_radio_button: 默认选中的单选按钮的ID
:param verification_text: 验证复选框的文本如果为 None则不显示复选框
:param verification_checked_by_default: 验证复选框是否默认勾选
:param show_progress_bar: 是否显示标准进度条
:param show_marquee_progress_bar: 是否显示跑马灯式进度条
"""
self.config = TASKDIALOGCONFIG()
self.config.cbSize = ctypes.sizeof(TASKDIALOGCONFIG)
self.config.hwndParent = parent_hwnd
self.config.dwFlags = TDF_ALLOW_DIALOG_CANCELLATION | TDF_POSITION_RELATIVE_TO_WINDOW
self.config.dwCommonButtons = self._process_common_buttons(common_buttons)
self.config.pszWindowTitle = title
self.config.pszMainInstruction = main_instruction
self.config.pszContent = content
self.config.pszFooter = footer
self.progress: int = 0
if show_progress_bar or show_marquee_progress_bar:
# 进度条暂时还没实现
raise NotImplementedError("Progress bar is not implemented yet.")
self.config.dwFlags |= TDF_CALLBACK_TIMER
if show_progress_bar:
self.config.dwFlags |= TDF_SHOW_PROGRESS_BAR
else:
self.config.dwFlags |= TDF_SHOW_MARQUEE_PROGRESS_BAR
# 将实例方法转为 C 回调函数指针。
# 必须将其保存为实例成员,否则会被垃圾回收!
self._callback_func_ptr = PFTASKDIALOGCALLBACK(self._callback)
self.config.pfCallback = self._callback_func_ptr
# 将本实例的id作为lpCallbackData传递以便在回调中识别
self.config.lpCallbackData = id(self)
# --- 图标设置 ---
processed_icon = self._process_main_icon(main_icon)
if processed_icon is not None:
if isinstance(processed_icon, wintypes.LPWSTR):
self.config.pszMainIcon = processed_icon
else:
self.config.dwFlags |= TDF_USE_HICON_MAIN
self.config.hMainIcon = processed_icon
# --- 自定义按钮设置 ---
self.custom_buttons_list = []
if custom_buttons:
self.config.cButtons = len(custom_buttons)
button_array_type = TASKDIALOG_BUTTON * len(custom_buttons)
self.custom_buttons_list = button_array_type()
for i, (btn_id, btn_text) in enumerate(custom_buttons):
self.custom_buttons_list[i].nButtonID = btn_id
self.custom_buttons_list[i].pszButtonText = btn_text
self.config.pButtons = self.custom_buttons_list
if default_button:
self.config.nDefaultButton = default_button
# --- 单选按钮设置 ---
self.radio_buttons_list = []
if radio_buttons:
self.config.cRadioButtons = len(radio_buttons)
radio_array_type = TASKDIALOG_BUTTON * len(radio_buttons)
self.radio_buttons_list = radio_array_type()
for i, (btn_id, btn_text) in enumerate(radio_buttons):
self.radio_buttons_list[i].nButtonID = btn_id
self.radio_buttons_list[i].pszButtonText = btn_text
self.config.pRadioButtons = self.radio_buttons_list
if default_radio_button:
self.config.nDefaultRadioButton = default_radio_button
# --- 验证复选框设置 ---
if verification_text:
self.config.pszVerificationText = verification_text
if verification_checked_by_default:
self.config.dwFlags |= TDF_VERIFICATION_FLAG_CHECKED
def _process_common_buttons(self, common_buttons: int | List[CommonButtonLiteral]) -> int:
"""处理 common_buttons 参数,支持常量和字符串列表两种形式"""
if isinstance(common_buttons, int):
# 直接使用 Win32 常量
return common_buttons
elif isinstance(common_buttons, list):
# 处理字符串列表
result = 0
for button in common_buttons:
# 使用 match 和 assert_never 进行类型检查
match button:
case "ok":
result |= TDCBF_OK_BUTTON
case "yes":
result |= TDCBF_YES_BUTTON
case "no":
result |= TDCBF_NO_BUTTON
case "cancel":
result |= TDCBF_CANCEL_BUTTON
case "retry":
result |= TDCBF_RETRY_BUTTON
case "close":
result |= TDCBF_CLOSE_BUTTON
case _:
# 这在实际中不会发生,因为类型检查会阻止它
from typing import assert_never
assert_never(button)
return result
else:
raise TypeError("common_buttons must be either an int or a list of strings")
def _process_main_icon(self, main_icon: Optional[wintypes.LPWSTR | int | IconLiteral]) -> Optional[wintypes.LPWSTR | int]:
"""处理 main_icon 参数,支持常量和字符串两种形式"""
if main_icon is None:
return None
elif isinstance(main_icon, (wintypes.LPWSTR, int)):
# 直接使用 Win32 常量或 HICON 句柄
return main_icon
elif isinstance(main_icon, str):
# 处理字符串
match main_icon:
case "warning":
return TD_WARNING_ICON
case "error":
return TD_ERROR_ICON
case "information":
return TD_INFORMATION_ICON
case "shield":
return TD_SHIELD_ICON
case _:
# 这在实际中不会发生,因为类型检查会阻止它
from typing import assert_never
assert_never(main_icon)
else:
raise TypeError("main_icon must be None, a Windows constant, or a string")
def _callback(self, hwnd: wintypes.HWND, msg: int, wParam: int, lParam: int, lpRefData: int) -> int:
# 仅当 lpRefData 指向的是当前这个对象实例时才处理
if lpRefData != id(self):
return 0 # S_OK
if msg == TDN_TIMER:
# 更新进度条
if self.progress < 100:
self.progress += 5
# 发送消息给对话框来更新进度条位置
user32.SendMessageW(hwnd, TDM_SET_PROGRESS_BAR_POS, self.progress, 0)
else:
# 示例进度达到100%后可以模拟点击OK按钮关闭对话框
# from ctypes import wintypes
# user32.PostMessageW(hwnd, wintypes.UINT(1125), IDOK, 0) # TDM_CLICK_BUTTON
pass
elif msg == TDN_DESTROYED:
# 对话框已销毁
pass
return 0 # S_OK
def show(self) -> Tuple[int, int, bool]:
"""
显示对话框并返回用户交互的结果
:return: 一个元组 (button_id, radio_button_id, verification_checked)
- button_id: 用户点击的按钮ID (例如 IDOK, IDCANCEL)
- radio_button_id: 用户选择的单选按钮的ID
- verification_checked: 验证复选框是否被勾选 (True/False)
"""
pnButton = ctypes.c_int(0)
pnRadioButton = ctypes.c_int(0)
pfVerificationFlagChecked = wintypes.BOOL(False)
hr = TaskDialogIndirect(
ctypes.byref(self.config),
ctypes.byref(pnButton),
ctypes.byref(pnRadioButton),
ctypes.byref(pfVerificationFlagChecked)
)
if hr == 0: # S_OK
return pnButton.value, pnRadioButton.value, bool(pfVerificationFlagChecked.value)
else:
raise ctypes.WinError(hr)
# --- 示例用法 ---
if __name__ == '__main__':
print("--- 示例 1: 简单信息框 ---")
dlg_simple = TaskDialog(
title="操作成功",
main_instruction="您的操作已成功完成。",
content="文件已保存到您的文档目录。",
common_buttons=["ok"],
main_icon="information"
)
result_simple, _, _ = dlg_simple.show()
print(f"用户点击了按钮: {result_simple} (1=OK)\n")
print("--- 示例 2: 确认框 ---")
dlg_confirm = TaskDialog(
title="确认删除",
main_instruction="您确定要永久删除这个文件吗?",
content="这个操作无法撤销。文件将被立即删除。",
common_buttons=["yes", "no", "cancel"],
main_icon="warning",
default_button=IDNO
)
result_confirm, _, _ = dlg_confirm.show()
if result_confirm == IDYES:
print("用户选择了“是”。")
elif result_confirm == IDNO:
print("用户选择了“否”。")
elif result_confirm == IDCANCEL:
print("用户选择了“取消”。")
print(f"返回的按钮ID: {result_confirm}\n")
# 示例 3
print("--- 示例 3: 自定义按钮 ---")
CUSTOM_BUTTON_SAVE_ID = 101
CUSTOM_BUTTON_DONT_SAVE_ID = 102
my_buttons = [
(CUSTOM_BUTTON_SAVE_ID, "保存并退出"),
(CUSTOM_BUTTON_DONT_SAVE_ID, "不保存直接退出")
]
dlg_custom = TaskDialog(
title="未保存的更改",
main_instruction="文档中有未保存的更改,您想如何处理?",
custom_buttons=my_buttons,
common_buttons=["cancel"],
main_icon="warning",
footer="这是一个重要的提醒!"
)
result_custom, _, _ = dlg_custom.show()
if result_custom == CUSTOM_BUTTON_SAVE_ID:
print("用户选择了“保存并退出”。")
elif result_custom == CUSTOM_BUTTON_DONT_SAVE_ID:
print("用户选择了“不保存直接退出”。")
elif result_custom == IDCANCEL:
print("用户选择了“取消”。")
print(f"返回的按钮ID: {result_custom}\n")
# 示例 4: 带单选按钮和验证框的对话框
print("--- 示例 4: 单选按钮和验证框 ---")
RADIO_BTN_WORD_ID = 201
RADIO_BTN_EXCEL_ID = 202
RADIO_BTN_PDF_ID = 203
radio_buttons = [
(RADIO_BTN_WORD_ID, "保存为 Word 文档 (.docx)"),
(RADIO_BTN_EXCEL_ID, "保存为 Excel 表格 (.xlsx)"),
(RADIO_BTN_PDF_ID, "导出为 PDF 文档 (.pdf)")
]
dlg_radio = TaskDialog(
title="选择导出格式",
main_instruction="请选择您想要导出的文件格式。",
content="选择一个格式后,点击“确定”继续。",
common_buttons=["ok", "cancel"],
main_icon="information",
radio_buttons=radio_buttons,
default_radio_button=RADIO_BTN_PDF_ID, # 默认选中PDF
verification_text="设为我的默认导出选项",
verification_checked_by_default=True
)
btn_id, radio_id, checked = dlg_radio.show()
if btn_id == IDOK:
print(f"用户点击了“确定”。")
if radio_id == RADIO_BTN_WORD_ID:
print("选择了导出为 Word。")
elif radio_id == RADIO_BTN_EXCEL_ID:
print("选择了导出为 Excel。")
elif radio_id == RADIO_BTN_PDF_ID:
print("选择了导出为 PDF。")
if checked:
print("用户勾选了“设为我的默认导出选项”。")
else:
print("用户未勾选“设为我的默认导出选项”。")
else:
print("用户点击了“取消”。")
print(f"返回的按钮ID: {btn_id}, 单选按钮ID: {radio_id}, 验证框状态: {checked}\n")

View File

@ -4,7 +4,9 @@ import uuid
import re
import logging
from typing import Literal
from pydantic import BaseModel, ConfigDict, field_serializer, field_validator
from pydantic import BaseModel, ConfigDict, ValidationError, field_serializer, field_validator
from kotonebot.kaa.errors import ProduceSolutionInvalidError, ProduceSolutionNotFoundError
from .const import ProduceAction, RecommendCardDetectionMode
@ -217,17 +219,17 @@ class ProduceSolutionManager:
:param id: 方案ID
:return: 方案对象
:raises FileNotFoundError: 当方案不存在时
:raises ProduceSloutionNotFoundError: 当方案不存在时
"""
file_path = self._find_file_path_by_id(id)
if not file_path:
raise FileNotFoundError(f"Solution with id '{id}' not found")
raise ProduceSolutionNotFoundError(id)
try:
with open(file_path, 'r', encoding='utf-8') as f:
return ProduceSolution.model_validate_json(f.read())
except Exception as e:
raise FileNotFoundError(f"Failed to read solution with id '{id}': {e}")
except ValidationError as e:
raise ProduceSolutionInvalidError(id, file_path, e)
def duplicate(self, id: str) -> ProduceSolution:
"""
@ -235,7 +237,7 @@ class ProduceSolutionManager:
:param id: 要复制的方案ID
:return: 新的方案对象具有新的ID和名称
:raises FileNotFoundError: 当原方案不存在时
:raises ProduceSolutionNotFoundError: 当原方案不存在时
"""
original = self.read(id)

View File

@ -64,6 +64,8 @@ class ContestConfig(ConfigBaseModel):
select_which_contestant: Literal[1, 2, 3] = 1
"""选择第几个挑战者"""
when_no_set: Literal['remind', 'wait', 'auto_set', 'auto_set_silent'] = 'remind'
"""竞赛队伍未编成时应该remind=通知我并跳过竞赛wait=提醒我并等待手动编成auto_set=使用自动编成并提醒auto_set_silent=使用自动编成不提醒"""
class ProduceConfig(ConfigBaseModel):

38
kotonebot/kaa/errors.py Normal file
View File

@ -0,0 +1,38 @@
import os
from kotonebot.errors import UserFriendlyError
class KaaError(Exception):
pass
class KaaUserFriendlyError(UserFriendlyError, KaaError):
def __init__(self, message: str, help_link: str):
super().__init__(message, [
(0, '打开帮助', lambda: os.startfile(help_link)),
(1, '知道了', lambda: None)
])
class ProduceSolutionNotFoundError(KaaUserFriendlyError):
def __init__(self, solution_id: str):
self.solution_id = solution_id
super().__init__(
f'培育方案「{solution_id}」不存在,请检查设置是否正确。',
'https://kdocs.cn/l/cetCY8mGKHLj?linkname=saPrDAmMd4'
)
class ProduceSolutionInvalidError(KaaUserFriendlyError):
def __init__(self, solution_id: str, file_path: str, reason: Exception):
self.solution_id = solution_id
self.reason = reason
super().__init__(
f'培育方案「{solution_id}」(路径 {file_path})存在无效配置,载入失败。',
'https://kdocs.cn/l/cetCY8mGKHLj?linkname=xnLUW1YYKz'
)
class IdolCardNotFoundError(KaaUserFriendlyError):
def __init__(self, skin_id: str):
self.skin_id = skin_id
super().__init__(
f'未找到 ID 为「{skin_id}」的偶像卡。请检查游戏内偶像皮肤与培育方案中偶像皮肤是否一致。',
'https://kdocs.cn/l/cetCY8mGKHLj?linkname=cySASqoPGj'
)

View File

@ -14,13 +14,15 @@ from typing import List, Dict, Tuple, Literal, Generator, Callable, Any, get_arg
import cv2
import gradio as gr
from kotonebot.kaa.errors import ProduceSolutionNotFoundError
from kotonebot.kaa.main import Kaa
from kotonebot.kaa.db import IdolCard
from kotonebot.backend.context.context import vars
from kotonebot.errors import ContextNotInitializedError
from kotonebot.client.host import Mumu12Host, LeidianHost
from kotonebot.config.manager import load_config, save_config
from kotonebot.config.base_config import UserConfig, BackendConfig
from kotonebot.backend.context import task_registry, ContextStackVars
from kotonebot.backend.context.context import vars
from kotonebot.client.host import Mumu12Host, LeidianHost
from kotonebot.kaa.config import (
BaseConfig, APShopItems, CapsuleToysConfig, ClubRewardConfig, PurchaseConfig, ActivityFundsConfig,
PresentsConfig, AssignmentConfig, ContestConfig, ProduceConfig,
@ -50,7 +52,7 @@ ConfigKey = Literal[
'mini_live_reassign', 'mini_live_duration',
'online_live_reassign', 'online_live_duration',
'contest_enabled',
'select_which_contestant',
'select_which_contestant', 'when_no_set',
# produce
'produce_enabled', 'selected_solution_id', 'produce_count',
@ -390,8 +392,8 @@ class KotoneBotUI:
"""获取暂停按钮的状态和交互性"""
try:
text = "恢复" if vars.flow.is_paused else "暂停"
except ValueError:
# ValueError: Forwarded object vars called before initialization.
except ContextNotInitializedError:
# ContextNotInitializedError: Forwarded object vars called before initialization.
# TODO: vars.flow.is_paused 应该要可以在脚本正式启动前就能访问
text = '未启动'
# 如果正在停止过程中,禁用暂停按钮
@ -417,7 +419,10 @@ class KotoneBotUI:
# 重新加载 Context 中的配置数据
from kotonebot.backend.context.context import config
config.load()
try:
config.load()
except ContextNotInitializedError:
pass
logger.info("配置已成功重新加载")
return True
@ -493,8 +498,8 @@ class KotoneBotUI:
run_btn = gr.Button("启动", scale=2)
pause_btn = gr.Button("暂停", scale=1)
# 快速功能启停控制区域
gr.Markdown("### 快速功能启停")
# 快速设置控制区域
gr.Markdown("### 快速设置")
with gr.Row(elem_classes=["quick-controls-row"]):
purchase_quick = gr.Checkbox(
label="商店",
@ -557,6 +562,20 @@ class KotoneBotUI:
elem_classes=["quick-checkbox"]
)
def _get_end_action_value() -> str:
if self.current_config.options.end_game.shutdown:
return "关机"
if self.current_config.options.end_game.hibernate:
return "休眠"
return "什么都不做"
end_action_dropdown = gr.Dropdown(
label="完成后:",
choices=["什么都不做", "关机", "休眠"],
value=_get_end_action_value(),
interactive=True
)
if self._kaa.upgrade_msg:
gr.Markdown('### 配置升级报告')
gr.Markdown(self._kaa.upgrade_msg)
@ -585,7 +604,7 @@ class KotoneBotUI:
def on_pause_click(evt: gr.EventData) -> str:
return self.toggle_pause()
# 快速功能控制的事件处理函数
# 快速设置控制的事件处理函数
def save_quick_setting(field_name: str, value: bool, display_name: str):
"""保存快速设置并立即应用"""
try:
@ -634,7 +653,7 @@ class KotoneBotUI:
outputs=[pause_btn]
)
# 绑定快速功能控制的事件
# 绑定快速设置控制的事件
purchase_quick.change(
fn=lambda x: save_quick_setting('purchase', x, '商店'),
inputs=[purchase_quick]
@ -676,13 +695,42 @@ class KotoneBotUI:
inputs=[upgrade_support_card_quick]
)
# 处理完成后操作下拉框
def save_quick_end_action(action: str):
try:
if action == "关机":
self.current_config.options.end_game.shutdown = True
self.current_config.options.end_game.hibernate = False
elif action == "休眠":
self.current_config.options.end_game.shutdown = False
self.current_config.options.end_game.hibernate = True
else: # 什么都不做
self.current_config.options.end_game.shutdown = False
self.current_config.options.end_game.hibernate = False
save_config(self.config, "config.json")
# 尝试热重载配置
if self.reload_config():
gr.Success(f"✓ 完成后操作已设置为 {action}")
else:
gr.Warning("⚠ 设置已保存,但重新加载失败")
except Exception as e:
gr.Error(f"✗ 保存失败:{str(e)}")
end_action_dropdown.change(
fn=save_quick_end_action,
inputs=[end_action_dropdown]
)
# 添加定时器,分别更新按钮状态和任务状态
def update_run_button_status():
text, interactive = self.get_button_status()
return gr.Button(value=text, interactive=interactive)
def update_quick_checkboxes():
"""更新快速功能控制的 checkbox 状态,确保与设置同步"""
"""更新快速设置区域控件的状态,确保与设置同步"""
end_action_val = _get_end_action_value()
return [
gr.Checkbox(value=self.current_config.options.purchase.enabled),
gr.Checkbox(value=self.current_config.options.assignment.enabled),
@ -694,6 +742,7 @@ class KotoneBotUI:
gr.Checkbox(value=self.current_config.options.presents.enabled),
gr.Checkbox(value=self.current_config.options.capsule_toys.enabled),
gr.Checkbox(value=self.current_config.options.upgrade_support_card.enabled),
gr.Dropdown(value=end_action_val),
]
gr.Timer(1.0).tick(
@ -709,7 +758,7 @@ class KotoneBotUI:
outputs=[
purchase_quick, assignment_quick, contest_quick, produce_quick,
mission_reward_quick, club_reward_quick, activity_funds_quick, presents_quick,
capsule_toys_quick, upgrade_support_card_quick
capsule_toys_quick, upgrade_support_card_quick, end_action_dropdown
]
)
gr.Timer(1.0).tick(
@ -958,12 +1007,6 @@ class KotoneBotUI:
interactive=True
)
keep_screenshots = gr.Checkbox(
label="保留截图数据",
value=self.current_config.keep_screenshots,
info=UserConfig.model_fields['keep_screenshots'].description,
interactive=True
)
target_screenshot_interval = gr.Number(
label="最小截图间隔(秒)",
@ -1028,14 +1071,12 @@ class KotoneBotUI:
# Common settings for all backend types
self.current_config.backend.screenshot_impl = data['screenshot_method']
self.current_config.backend.target_screenshot_interval = data['target_screenshot_interval']
self.current_config.keep_screenshots = data['keep_screenshots'] # This is a UserConfig field
return set_config, {
'adb_ip': adb_ip,
'adb_port': adb_port,
'screenshot_method': screenshot_impl, # screenshot_impl is the component
'screenshot_method': screenshot_impl,
'target_screenshot_interval': target_screenshot_interval,
'keep_screenshots': keep_screenshots,
'check_emulator': check_emulator,
'emulator_path': emulator_path,
'adb_emulator_name': adb_emulator_name,
@ -1210,6 +1251,20 @@ class KotoneBotUI:
interactive=True,
info=ContestConfig.model_fields['select_which_contestant'].description
)
when_no_set_choices = [
("通知我并跳过竞赛", "remind"),
("提醒我并等待手动编成", "wait"),
("使用自动编成并提醒我", "auto_set"),
("使用自动编成", "auto_set_silent")
]
when_no_set = gr.Dropdown(
choices=when_no_set_choices,
value=self.current_config.options.contest.when_no_set,
label="竞赛队伍未编成时",
interactive=True,
info=ContestConfig.model_fields['when_no_set'].description
)
contest_enabled.change(
fn=lambda x: gr.Group(visible=x),
inputs=[contest_enabled],
@ -1219,10 +1274,12 @@ class KotoneBotUI:
def set_config(config: BaseConfig, data: dict[ConfigKey, Any]) -> None:
config.contest.enabled = data['contest_enabled']
config.contest.select_which_contestant = data['select_which_contestant']
config.contest.when_no_set = data['when_no_set']
return set_config, {
'contest_enabled': contest_enabled,
'select_which_contestant': select_which_contestant
'select_which_contestant': select_which_contestant,
'when_no_set': when_no_set
}
def _create_produce_settings(self) -> ConfigBuilderReturnValue:
@ -1313,7 +1370,7 @@ class KotoneBotUI:
if selected_solution_id:
try:
current_solution = solution_manager.read(selected_solution_id)
except FileNotFoundError:
except ProduceSolutionNotFoundError:
pass
if current_solution is None:
@ -1592,6 +1649,8 @@ class KotoneBotUI:
gr.Group(visible=False), # memory_sets_group
gr.Dropdown(visible=False), # memory_sets
gr.Checkbox(visible=False), # auto_set_support
gr.Group(visible=False), # support_card_sets_group
gr.Dropdown(visible=False), # support_card_sets
gr.Checkbox(visible=False), # use_pt_boost
gr.Checkbox(visible=False), # use_note_boost
gr.Checkbox(visible=False), # follow_producer
@ -1633,7 +1692,7 @@ class KotoneBotUI:
gr.Checkbox(value=solution.data.skip_commu, visible=True),
gr.Button(visible=True), # save_solution_btn
]
except FileNotFoundError:
except ProduceSolutionNotFoundError:
gr.Warning(f"培育方案 {solution_id} 不存在")
return on_solution_change(None)
except Exception as e:
@ -1651,12 +1710,15 @@ class KotoneBotUI:
solution_choices = [(f"{sol.name} - {sol.description or '无描述'}", sol.id) for sol in solutions]
gr.Success("新培育方案创建成功")
# 根据是否有设置Tab的下拉框来决定返回值
updated_dropdown = gr.Dropdown(choices=solution_choices, value=new_solution.id)
# 更新培育 Tab 下拉框并保持设置 Tab 当前选中值
updated_dropdown_produce = gr.Dropdown(choices=solution_choices, value=new_solution.id)
if settings_dropdown is not None:
return [updated_dropdown, updated_dropdown]
current_selected = self.current_config.options.produce.selected_solution_id
updated_dropdown_settings = gr.Dropdown(choices=solution_choices, value=current_selected)
return [updated_dropdown_produce, updated_dropdown_settings]
else:
return updated_dropdown
return updated_dropdown_produce
except Exception as e:
gr.Error(f"创建培育方案失败:{str(e)}")
if settings_dropdown is not None:
@ -1673,6 +1735,14 @@ class KotoneBotUI:
else:
return gr.Dropdown()
# 若尝试删除当前正在使用的培育方案,则拒绝并提示
if solution_id == self.current_config.options.produce.selected_solution_id:
gr.Warning("不可删除选中方案。请先在设置中选择其他方案,保存后再删除此方案。")
if settings_dropdown is not None:
return [gr.Dropdown(), gr.Dropdown()]
else:
return gr.Dropdown()
try:
solution_manager.delete(solution_id)
@ -1681,8 +1751,12 @@ class KotoneBotUI:
solution_choices = [(f"{sol.name} - {sol.description or '无描述'}", sol.id) for sol in solutions]
gr.Success("培育方案删除成功")
# 根据是否有设置Tab的下拉框来决定返回值
updated_dropdown = gr.Dropdown(choices=solution_choices, value=None)
# 删除方案后,保持当前培育方案的选择不变
current_selected = self.current_config.options.produce.selected_solution_id
if current_selected not in [sol.id for sol in solutions]:
current_selected = None # 已不存在
updated_dropdown = gr.Dropdown(choices=solution_choices, value=current_selected)
if settings_dropdown is not None:
return [updated_dropdown, updated_dropdown]
else:
@ -2162,6 +2236,37 @@ class KotoneBotUI:
'expose_to_lan': expose_to_lan
}
def _create_debug_settings(self) -> ConfigBuilderReturnValue:
"""调试设置:仅在调试时使用"""
with gr.Column():
gr.Markdown("### 调试设置")
gr.Markdown('<div style="color: red;">仅供调试使用。正常运行时务必关闭下面所有的选项。</div>')
keep_screenshots = gr.Checkbox(
label="保留截图数据",
value=self.current_config.keep_screenshots,
info=UserConfig.model_fields['keep_screenshots'].description,
interactive=True
)
trace_recommend_card_detection = gr.Checkbox(
label="跟踪推荐卡检测",
value=self.current_config.options.trace.recommend_card_detection,
info=TraceConfig.model_fields['recommend_card_detection'].description,
interactive=True
)
def set_config(config: BaseConfig, data: dict[ConfigKey, Any]) -> None:
# 保留截图数据属于 UserConfig
self.current_config.keep_screenshots = data['keep_screenshots']
# 跟踪推荐卡检测属于 BaseConfig.trace
config.trace.recommend_card_detection = data['trace_recommend_card_detection']
return set_config, {
'keep_screenshots': keep_screenshots,
'trace_recommend_card_detection': trace_recommend_card_detection
}
def _create_settings_tab(self) -> None:
with gr.Tab("设置"):
gr.Markdown("## 设置")
@ -2196,18 +2301,18 @@ class KotoneBotUI:
# 升级支援卡设置
capsule_toys_settings = self._create_capsule_toys_settings()
# 跟踪设置
trace_settings = self._create_trace_settings()
# 杂项设置
misc_settings = self._create_misc_settings()
# 启动游戏设置
start_game_settings = self._create_start_game_settings()
# 关闭游戏设置
end_game_settings = self._create_end_game_settings()
# 杂项设置
misc_settings = self._create_misc_settings()
# 调试设置(放在最后)
debug_settings = self._create_debug_settings()
save_btn = gr.Button("保存设置")
result = gr.Markdown()
@ -2225,8 +2330,8 @@ class KotoneBotUI:
capsule_toys_settings,
start_game_settings,
end_game_settings,
trace_settings,
misc_settings
misc_settings,
debug_settings
] # list of (set_func, { 'key': component, ... })
all_components = [list(ret[1].values()) for ret in all_return_values] # [[c1, c2], [c3], ...]
all_components = list(chain(*all_components)) # [c1, c2, c3, ...]

View File

@ -146,7 +146,6 @@ class Kaa(KotoneBot):
raise ValueError('Backend instance is not set.')
_set_instance(self.backend_instance)
from kotonebot import device
logger.info('Device resolution: %s', device.screen_size)
logger.info('Set target resolution to 720x1280.')
device.orientation = 'portrait'
device.target_resolution = (720, 1280)

View File

@ -2,12 +2,15 @@
import logging
from gettext import gettext as _
from kotonebot.errors import StopCurrentTask
from kotonebot.kaa.tasks import R
from kotonebot.kaa.config import conf
from kotonebot.kaa.game_ui import WhiteFilter
from kotonebot.kaa.game_ui import WhiteFilter, dialog
from ..actions.scenes import at_home, goto_home
from ..actions.loading import wait_loading_end
from kotonebot import device, image, ocr, color, action, task, user, rect_expand, sleep, contains, Interval
from kotonebot import device, image, ocr, color, action, task, rect_expand, sleep, contains, Interval
from kotonebot.backend.context.context import vars
from kotonebot.ui import user as ui_user
logger = logging.getLogger(__name__)
@ -70,11 +73,39 @@ def handle_challenge() -> bool:
# 记忆未编成 [screenshots/contest/no_memo.png]
if image.find(R.Daily.TextContestNoMemory):
logger.debug('Memory not set. Using auto-compilation.')
user.warning('竞赛未编成', _('记忆未编成。将使用自动编成。'), once=True)
if image.find(R.Daily.ButtonContestChallenge):
device.click()
return True
logger.debug('Memory not set.')
when_no_set = conf().contest.when_no_set
auto_compilation = False
match when_no_set:
case 'remind':
# 关闭编成提示弹窗
dialog.expect_no(msg='Closed memory not set dialog.')
ui_user.warning('竞赛未编成', '已跳过此次竞赛任务。')
logger.info('Contest skipped due to memory not set (remind mode).')
raise StopCurrentTask
case 'wait':
dialog.expect_no(msg='Closed memory not set dialog.')
ui_user.warning('竞赛未编成', '已自动暂停,请手动编成后返回至挑战开始页,并点击网页上「恢复」按钮或使用快捷键继续执行。')
vars.flow.request_pause(wait_resume=True)
logger.info('Contest paused due to memory not set (wait mode).')
return True
case 'auto_set' | 'auto_set_silent':
if when_no_set == 'auto_set':
ui_user.warning('竞赛未编成', '将使用自动编成。', once=True)
logger.debug('Using auto-compilation with notification.')
else: # auto_set_silent
logger.debug('Using auto-compilation silently.')
auto_compilation = True
case _:
logger.warning(f'Unknown value for contest.when_no_set: {when_no_set}, fallback to auto.')
logger.debug('Using auto-compilation silently.')
auto_compilation = True
if auto_compilation:
if image.find(R.Daily.ButtonContestChallenge):
device.click()
return True
# 勾选跳过所有
# [screenshots/contest/contest2.png]
@ -85,7 +116,7 @@ def handle_challenge() -> bool:
# 跳过所有
# [screenshots/contest/contest1.png]
if image.find(R.Daily.ButtonIconSkip, preprocessors=[WhiteFilter()]):
if image.find(R.Daily.ButtonIconSkip, preprocessors=[WhiteFilter()], threshold=0.7):
logger.debug('Skipping all.')
device.click()
return True

View File

@ -8,7 +8,6 @@ from kotonebot.kaa.config import conf, DailyMoneyShopItems
from kotonebot.primitives.geometry import Point
from kotonebot.util import Countdown, cropped
from kotonebot import task, device, image, action, sleep
from kotonebot.backend.dispatch import SimpleDispatcher
from ..actions.scenes import goto_home, goto_shop, at_daily_shop
logger = logging.getLogger(__name__)
@ -75,8 +74,9 @@ def dispatch_recommended_items():
device.screenshot()
if rec := image.find(R.Daily.TextShopRecommended):
logger.info(f'Clicking on recommended item.') # TODO: 计数
device.click()
confirm_purchase(rec.position)
pos = rec.position.offset(dx=0, dy=80)
device.click(pos)
confirm_purchase(pos)
sleep(2.5) #
elif image.find(R.Daily.IconTitleDailyShop) and not image.find(R.Daily.TextShopRecommended):
logger.info(f'No recommended item found. Finished.')

View File

@ -5,6 +5,7 @@ import logging
import _thread
import threading
from kotonebot.backend.bot import PostTaskContext
from kotonebot.ui import user
from ..kaa_context import instance
from kotonebot.kaa.config import Priority, conf
@ -35,8 +36,8 @@ def windows_close():
os.system('taskkill /f /im gakumas.exe')
logger.info("Game closed successfully")
@task('关闭游戏', priority=Priority.END_GAME)
def end_game():
@task('关闭游戏', priority=Priority.END_GAME, run_at='post')
def end_game(ctx: PostTaskContext):
"""
游戏结束时执行的任务
"""
@ -101,4 +102,4 @@ if __name__ == '__main__':
conf().end_game.kill_game = True
conf().end_game.kill_dmm = True
conf().end_game.kill_emulator = True
end_game()
end_game(PostTaskContext(False, None))

View File

@ -12,9 +12,8 @@ from .cards import do_cards, CardDetectResult
from ..actions.commu import handle_unread_commu
from kotonebot.errors import UnrecoverableError
from kotonebot.util import Countdown, Interval, cropped
from kotonebot.backend.dispatch import DispatcherContext
from kotonebot.backend.loop import Loop
from kotonebot.kaa.config import ProduceAction, RecommendCardDetectionMode
from kotonebot.kaa.config import conf
from ..produce.common import until_acquisition_clear, commu_event, fast_acquisitions
from kotonebot import ocr, device, contains, image, regex, action, sleep, wait
from ..produce.non_lesson_actions import (
@ -197,8 +196,8 @@ def practice():
is_strict_mode = produce_solution().data.recommend_card_detection_mode == RecommendCardDetectionMode.STRICT
if is_strict_mode:
return (
result.score >= 0.05
and len(list(filter(lambda x: x >= 0.05, border_scores))) >= 3
result.score >= 0.043
and len(list(filter(lambda x: x >= 0.04, border_scores))) >= 3
)
else:
return result.score >= 0.03
@ -255,7 +254,7 @@ def exam(type: Literal['mid', 'final']):
if result.type == 10: # SKIP
return total(0.4) and borders(0.02)
else:
return total(0.2) and borders(0.02)
return total(0.15) and borders(0.02)
else:
return total(0.10) and borders(0.01)
@ -705,8 +704,8 @@ ProduceStage = Literal[
'unknown', # 未知场景
]
@action('检测当前培育场景', dispatcher=True)
def detect_produce_scene(ctx: DispatcherContext) -> ProduceStage:
@action('检测当前培育场景')
def detect_produce_scene() -> ProduceStage:
"""
判断当前是培育的什么阶段并开始 Regular 培育
@ -715,31 +714,33 @@ def detect_produce_scene(ctx: DispatcherContext) -> ProduceStage:
"""
logger.info("Detecting current produce stage...")
# 行动场景
texts = ocr.ocr()
if (
image.find_multi([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
])
):
logger.info("Detection result: At action scene.")
ctx.finish()
return 'action'
elif texts.where(regex('CLEARまで|PERFECTまで')):
logger.info("Detection result: At practice ongoing.")
ctx.finish()
return 'practice-ongoing'
elif is_exam_scene():
logger.info("Detection result: At exam scene.")
ctx.finish()
return 'exam-ongoing'
else:
if fast_acquisitions():
return 'unknown'
if commu_event():
return 'unknown'
return 'unknown'
for _ in Loop():
# 行动场景
texts = ocr.ocr()
if (
image.find_multi([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
])
):
logger.info("Detection result: At action scene.")
return 'action'
elif texts.where(regex('CLEARまで|PERFECTまで')):
logger.info("Detection result: At practice ongoing.")
return 'practice-ongoing'
elif is_exam_scene():
logger.info("Detection result: At exam scene.")
return 'exam-ongoing'
else:
if fast_acquisitions():
# 继续循环检测
pass
elif commu_event():
# 继续循环检测
pass
# 如果没有返回,说明需要继续检测
sleep(0.5) # 等待一段时间再重新检测
return 'unknown'
@action('开始 Hajime 培育')
def hajime_from_stage(stage: ProduceStage, type: Literal['regular', 'pro', 'master'], week: int):

View File

@ -15,6 +15,7 @@ from kotonebot.kaa.game_ui.idols_overview import locate_idol, match_idol
from ..produce.in_purodyuusu import hajime_pro, hajime_regular, hajime_master, resume_pro_produce, resume_regular_produce, \
resume_master_produce
from kotonebot import device, image, ocr, task, action, sleep, contains, regex
from kotonebot.kaa.errors import IdolCardNotFoundError
logger = logging.getLogger(__name__)
@ -58,7 +59,7 @@ def select_idol(skin_id: str):
# 选择偶像
pos = locate_idol(skin_id)
if pos is None:
raise ValueError(f"Idol {skin_id} not found.")
raise IdolCardNotFoundError(skin_id)
# 确认
it.reset()
while btn_confirm := image.find(R.Common.ButtonConfirmNoIcon):

View File

@ -12,6 +12,7 @@ from .actions.scenes import at_home, goto_home
from .actions.commu import handle_unread_commu
from kotonebot.errors import GameUpdateNeededError
from kotonebot import task, action, sleep, device, image, ocr, config
from kotonebot.backend.context.context import vars
logger = logging.getLogger(__name__)
@ -170,6 +171,7 @@ def windows_launch():
# 等待游戏窗口出现
it = Interval()
while True:
vars.flow.check()
if ahk.find_window(title='gakumas', title_match_mode=3):
logger.debug('Game window found.')
break

View File

@ -4,6 +4,7 @@ import time
import cv2
from cv2.typing import MatLike
from win11toast import toast
from .pushkit import Wxpusher
from .. import logging
@ -25,17 +26,6 @@ def retry(func):
continue
return wrapper
def ask(
question: str,
options: list[str],
*,
timeout: float = -1,
) -> bool:
"""
询问用户
"""
raise NotImplementedError
def _save_local(
title: str,
message: str,
@ -74,6 +64,43 @@ def push(
logger.warning('push remote message failed: %s', e)
_save_local(title, message, images)
def _show_toast(title: str, message: str | None = None, buttons: list[str] | None = None):
"""
统一的 Toast 通知函数
:param title: 通知标题
:param message: 通知消息内容
:param buttons: 按钮列表如果提供则显示带按钮的通知
"""
try:
if buttons:
logger.verbose('showing toast notification with buttons: %s - %s', title, message or '')
toast(title, message or '', buttons=buttons)
else:
# 如果没有 message只显示 title
if message:
logger.verbose('showing toast notification: %s - %s', title, message)
toast(title, message)
else:
logger.verbose('showing toast notification: %s', title)
toast(title)
except Exception as e:
logger.warning('toast notification failed: %s', e)
def ask(
question: str,
options: list[tuple[str, str]],
*,
timeout: float = -1,
) -> str:
"""
询问用户
"""
# 将选项转换为按钮列表
buttons = [option[1] for option in options]
_show_toast("琴音小助手询问", question, buttons=buttons)
raise NotImplementedError
def info(
title: str,
message: str | None = None,
@ -83,6 +110,7 @@ def info(
):
logger.info('user.info: %s', message)
push('KAA' + title, message, images=images)
_show_toast('KAA' + title, message)
def warning(
title: str,
@ -98,7 +126,8 @@ def warning(
:param once: 每次运行是否只显示一次
"""
logger.warning('user.warning: %s', message)
push("KAA 警告:" + title, message, images=images)
push("琴音小助手警告:" + title, message, images=images)
_show_toast("琴音小助手警告:" + title, message)
def error(
title: str,
@ -111,4 +140,5 @@ def error(
错误信息
"""
logger.error('user.error: %s', message)
push("KAA 错误:" + title, message, images=images)
push("琴音小助手错误:" + title, message, images=images)
_show_toast("琴音小助手错误:" + title, message)

View File

@ -41,6 +41,7 @@ dependencies = [
# TODO: move these dependencies to optional-dependencies
"pywin32==310",
"ahk==1.8.3",
"win11toast==0.35", # For Windows Toast Notification
]

View File

@ -1,4 +1,5 @@
-r requirements.txt
pywin32==310
ahk==1.8.3
ahk==1.8.3
win11toast==0.35

View File

@ -12,6 +12,7 @@ from kotonebot.kaa.config.produce import (
ProduceSolutionManager
)
from kotonebot.kaa.config.const import ProduceAction, RecommendCardDetectionMode
from kotonebot.kaa.errors import ProduceSolutionNotFoundError
class TestProduceData(TestCase):
@ -360,7 +361,7 @@ class TestProduceSolutionManager(TestCase):
def test_read_nonexistent_solution(self):
"""测试读取不存在的方案"""
with self.assertRaises(FileNotFoundError) as context:
with self.assertRaises(ProduceSolutionNotFoundError) as context:
self.manager.read('nonexistent_id')
self.assertIn("Solution with id 'nonexistent_id' not found", str(context.exception))
@ -429,7 +430,7 @@ class TestProduceSolutionManager(TestCase):
def test_duplicate_nonexistent_solution(self):
"""测试复制不存在的方案"""
with self.assertRaises(FileNotFoundError):
with self.assertRaises(ProduceSolutionNotFoundError):
self.manager.duplicate('nonexistent_id')
def test_corrupted_json_handling(self):