Merge branch 'dev'

This commit is contained in:
XcantloadX 2025-07-27 17:13:18 +08:00
commit f9fafb9d71
21 changed files with 1091 additions and 234 deletions

View File

@ -12,6 +12,15 @@ from kotonebot.client import Device
from kotonebot.client.host.protocol import Instance
from kotonebot.backend.context import init_context, vars
from kotonebot.backend.context import task_registry, action_registry, Task, Action
from kotonebot.errors import StopCurrentTask, UserFriendlyError
from kotonebot.interop.win.task_dialog import TaskDialog
@dataclass
class PostTaskContext:
has_error: bool
exception: Exception | None
log_stream = io.StringIO()
stream_handler = logging.StreamHandler(log_stream)
@ -19,10 +28,11 @@ stream_handler.setFormatter(logging.Formatter('[%(asctime)s] [%(levelname)s] [%(
logging.getLogger('kotonebot').addHandler(stream_handler)
logger = logging.getLogger(__name__)
TaskStatusValue = Literal['pending', 'running', 'finished', 'error', 'cancelled', 'stopped']
@dataclass
class TaskStatus:
task: Task
status: Literal['pending', 'running', 'finished', 'error', 'cancelled']
status: TaskStatusValue
@dataclass
class RunStatus:
@ -73,7 +83,7 @@ class Event(Generic[Params, Return]):
class KotoneBotEvents:
def __init__(self):
self.task_status_changed = Event[
[Task, Literal['pending', 'running', 'finished', 'error', 'cancelled']], None
[Task, TaskStatusValue], None
]()
self.task_error = Event[
[Task, Exception], None
@ -171,42 +181,80 @@ class KotoneBot:
self._on_after_init_context()
vars.flow.clear_interrupt()
pre_tasks = [task for task in tasks if task.run_at == 'pre']
regular_tasks = [task for task in tasks if task.run_at == 'regular']
post_tasks = [task for task in tasks if task.run_at == 'post']
if by_priority:
tasks = sorted(tasks, key=lambda x: x.priority, reverse=True)
for task in tasks:
pre_tasks = sorted(pre_tasks, key=lambda x: x.priority, reverse=True)
regular_tasks = sorted(regular_tasks, key=lambda x: x.priority, reverse=True)
post_tasks = sorted(post_tasks, key=lambda x: x.priority, reverse=True)
all_tasks = pre_tasks + regular_tasks + post_tasks
for task in all_tasks:
self.events.task_status_changed.trigger(task, 'pending')
for task in tasks:
has_error = False
exception: Exception | None = None
for task in all_tasks:
logger.info(f'Task started: {task.name}')
self.events.task_status_changed.trigger(task, 'running')
if self.debug:
task.func()
if task.run_at == 'post':
task.func(PostTaskContext(has_error, exception))
else:
task.func()
else:
try:
task.func()
if task.run_at == 'post':
task.func(PostTaskContext(has_error, exception))
else:
task.func()
self.events.task_status_changed.trigger(task, 'finished')
except StopCurrentTask:
logger.info(f'Task skipped/stopped: {task.name}')
self.events.task_status_changed.trigger(task, 'stopped')
# 用户中止
except KeyboardInterrupt as e:
logger.exception('Keyboard interrupt detected.')
for task1 in tasks[tasks.index(task):]:
for task1 in all_tasks[all_tasks.index(task):]:
self.events.task_status_changed.trigger(task1, 'cancelled')
vars.flow.clear_interrupt()
break
# 用户可以自行处理的错误
except UserFriendlyError as e:
logger.error(f'Task failed: {task.name}')
logger.exception(f'Error: ')
has_error = True
exception = e
dialog = TaskDialog(
title='琴音小助手',
common_buttons=0,
main_instruction='任务执行失败',
content=e.message,
custom_buttons=e.action_buttons,
main_icon='error'
)
result_custom, _, _ = dialog.show()
e.invoke(result_custom)
# 其他错误
except Exception as e:
logger.error(f'Task failed: {task.name}')
logger.exception(f'Error: ')
has_error = True
exception = e
report_path = None
if self.auto_save_error_report:
raise NotImplementedError
self.events.task_status_changed.trigger(task, 'error')
if not self.resume_on_error:
for task1 in tasks[tasks.index(task)+1:]:
for task1 in all_tasks[all_tasks.index(task)+1:]:
self.events.task_status_changed.trigger(task1, 'cancelled')
break
logger.info(f'Task finished: {task.name}')
logger.info('All tasks finished.')
logger.info(f'Task ended: {task.name}')
logger.info('All tasks ended.')
self.events.finished.trigger()
def run_all(self) -> None:
@ -228,7 +276,7 @@ class KotoneBot:
self.events.finished -= _on_finished
self.events.task_status_changed -= _on_task_status_changed
def _on_task_status_changed(task: Task, status: Literal['pending', 'running', 'finished', 'error', 'cancelled']):
def _on_task_status_changed(task: Task, status: TaskStatusValue):
def _find(task: Task) -> TaskStatus:
for task_status in run_status.tasks:
if task_status.task == task:

View File

@ -1,19 +1,18 @@
import logging
from typing import Callable, ParamSpec, TypeVar, overload, Concatenate, Literal
from typing import Callable, ParamSpec, TypeVar, overload, Literal
from dataclasses import dataclass
from typing_extensions import deprecated
import cv2
from cv2.typing import MatLike
from .context import ContextStackVars, ScreenshotMode
from ..dispatch import dispatcher as dispatcher_decorator, DispatcherContext
from ...errors import TaskNotFoundError
P = ParamSpec('P')
R = TypeVar('R')
logger = logging.getLogger(__name__)
TaskRunAtType = Literal['pre', 'post', 'manual', 'regular'] | str
@dataclass
class Task:
name: str
@ -24,6 +23,8 @@ class Task:
"""
任务优先级数字越大优先级越高
"""
run_at: TaskRunAtType = 'regular'
@dataclass
class Action:
@ -51,6 +52,7 @@ def task(
pass_through: bool = False,
priority: int = 0,
screenshot_mode: ScreenshotMode = 'auto',
run_at: TaskRunAtType = 'regular'
):
"""
`task` 装饰器用于标记一个函数为任务函数
@ -62,6 +64,7 @@ def task(
默认情况下 @task 装饰器会包裹任务函数跟踪其执行情况
如果不想跟踪则设置此参数为 False
:param priority: 任务优先级数字越大优先级越高
:param run_at: 任务运行时间
"""
# 设置 ID
# 获取 caller 信息
@ -70,7 +73,7 @@ def task(
description = description or func.__doc__ or ''
# TODO: task_id 冲突检测
task_id = task_id or func.__name__
task = Task(name, task_id, description, _placeholder, priority)
task = Task(name, task_id, description, _placeholder, priority, run_at)
task_registry[name] = task
logger.debug(f'Task "{name}" registered.')
if pass_through:
@ -98,7 +101,6 @@ def action(
pass_through: bool = False,
priority: int = 0,
screenshot_mode: ScreenshotMode | None = None,
dispatcher: Literal[False] = False,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""
`action` 装饰器用于标记一个函数为动作函数
@ -110,38 +112,6 @@ def action(
如果不想跟踪则设置此参数为 False
:param priority: 动作优先级数字越大优先级越高
:param screenshot_mode: 截图模式
:param dispatcher:
是否为分发器模式默认为假
如果使用分发器则函数的第一个参数必须为 `ctx: DispatcherContext`
"""
...
@overload
@deprecated('使用普通 while 循环代替')
def action(
name: str,
*,
description: str|None = None,
pass_through: bool = False,
priority: int = 0,
screenshot_mode: ScreenshotMode | None = None,
dispatcher: Literal[True, 'fragment'] = True,
) -> Callable[[Callable[Concatenate[DispatcherContext, P], R]], Callable[P, R]]:
"""
`action` 装饰器用于标记一个函数为动作函数
此重载启用了分发器模式被装饰函数的第一个参数必须为 `ctx: DispatcherContext`
:param name: 动作名称如果为 None则使用函数的名称作为名称
:param description: 动作描述如果为 None则使用函数的 docstring 作为描述
:param pass_through:
默认情况下 @action 装饰器会包裹动作函数跟踪其执行情况
如果不想跟踪则设置此参数为 False
:param priority: 动作优先级数字越大优先级越高
:param screenshot_mode: 截图模式必须为 `'manual' / None`
:param dispatcher:
是否为分发器模式默认为假
如果使用分发器则函数的第一个参数必须为 `ctx: DispatcherContext`
"""
...
@ -176,11 +146,6 @@ def action(*args, **kwargs):
pass_through = kwargs.get('pass_through', False)
priority = kwargs.get('priority', 0)
screenshot_mode = kwargs.get('screenshot_mode', None)
dispatcher = kwargs.get('dispatcher', False)
if dispatcher == True or dispatcher == 'fragment':
if not (screenshot_mode is None or screenshot_mode == 'manual'):
raise ValueError('`screenshot_mode` must be None or "manual" when `dispatcher=True`.')
screenshot_mode = 'manual'
def _action_decorator(func: Callable):
nonlocal pass_through
action = _register(_placeholder, name, description)
@ -188,8 +153,6 @@ def action(*args, **kwargs):
if pass_through:
return func
else:
if dispatcher:
func = dispatcher_decorator(func, fragment=(dispatcher == 'fragment')) # type: ignore
def _wrapper(*args: P.args, **kwargs: P.kwargs):
current_callstack.append(action)
vars = ContextStackVars.push(screenshot_mode=screenshot_mode)

View File

@ -1,12 +1,7 @@
import time
import uuid
import logging
import inspect
from logging import Logger
from types import CodeType
from dataclasses import dataclass
from typing import Annotated, Any, Callable, Concatenate, Sequence, TypeVar, ParamSpec, Literal, Protocol, cast
from typing_extensions import deprecated
from typing import Any, Callable, Literal
from dataclasses import dataclass
@ -16,104 +11,6 @@ from kotonebot.primitives import Rect, is_rect
from .core import Image
logger = logging.getLogger(__name__)
P = ParamSpec('P')
R = TypeVar('R')
ThenAction = Literal['click', 'log']
DoAction = Literal['click']
# TODO: 需要找个地方统一管理这些属性名
ATTR_DISPATCHER_MARK = '__kb_dispatcher_mark'
ATTR_ORIGINAL_FUNC = '_kb_inner'
class DispatchFunc: pass
wrapper_to_func: dict[Callable, Callable] = {}
class DispatcherContext:
def __init__(self):
self.finished: bool = False
self._first_run: bool = True
def finish(self):
"""标记已完成 dispatcher 循环。循环将在下次条件检测时退出。"""
self.finished = True
def expand(self, func: Annotated[Callable[[], Any], DispatchFunc], ignore_finish: bool = True):
"""
调用其他 dispatcher 函数
使用 `expand` 和直接调用的区别是
* 直接调用会执行 while 循环直到满足结束条件
* 而使用 `expand` 则只会执行一次效果类似于将目标函数里的代码直接复制粘贴过来
"""
# 获取原始函数
original_func = func
while not getattr(original_func, ATTR_DISPATCHER_MARK, False):
original_func = getattr(original_func, ATTR_ORIGINAL_FUNC)
original_func = getattr(original_func, ATTR_ORIGINAL_FUNC)
if not original_func:
raise ValueError(f'{repr(func)} is not a dispatcher function.')
elif not callable(original_func):
raise ValueError(f'{repr(original_func)} is not callable.')
original_func = cast(Callable[[DispatcherContext], Any], original_func)
old_finished = self.finished
ret = original_func(self)
if ignore_finish:
self.finished = old_finished
return ret
@property
def beginning(self) -> bool:
"""是否为第一次运行"""
return self._first_run
@property
def finishing(self) -> bool:
"""是否即将结束运行"""
return self.finished
@deprecated('使用 SimpleDispatcher 类或 while 循环替代')
def dispatcher(
func: Callable[Concatenate[DispatcherContext, P], R],
*,
fragment: bool = False
) -> Annotated[Callable[P, R], DispatchFunc]:
"""
注意\n
此装饰器必须在应用 @action/@task 装饰器后再应用 `screenshot_mode='manual'` 参数必须设置
或者也可以使用 @action/@task 装饰器中的 `dispatcher=True` 参数
那么就没有上面两个要求了
:param fragment:
片段模式默认不启用
启用后被装饰函数将会只执行依次
而不会一直循环到 ctx.finish() 被调用
"""
def wrapper(*args: P.args, **kwargs: P.kwargs):
ctx = DispatcherContext()
while not ctx.finished:
from kotonebot import device
device.screenshot()
ret = func(ctx, *args, **kwargs)
ctx._first_run = False
return ret
def fragment_wrapper(*args: P.args, **kwargs: P.kwargs):
ctx = DispatcherContext()
from kotonebot import device
device.screenshot()
return func(ctx, *args, **kwargs)
setattr(wrapper, ATTR_ORIGINAL_FUNC, func)
setattr(fragment_wrapper, ATTR_ORIGINAL_FUNC, func)
setattr(wrapper, ATTR_DISPATCHER_MARK, True)
setattr(fragment_wrapper, ATTR_DISPATCHER_MARK, True)
wrapper_to_func[wrapper] = func
if fragment:
return fragment_wrapper
else:
return wrapper
@dataclass
class ClickParams:

View File

@ -95,7 +95,7 @@ class FlowController:
logger.info('Interrupt requested.')
self.interrupt_event.set()
def request_pause(self) -> None:
def request_pause(self, *, wait_resume: bool = False) -> None:
"""
请求暂停任务
@ -106,6 +106,8 @@ class FlowController:
if not self.paused:
logger.info('Pause requested.')
self.paused = True
if wait_resume:
self.check()
def request_resume(self) -> None:
"""

View File

@ -1,9 +1,41 @@
from typing import Callable
class KotonebotError(Exception):
pass
class KotonebotWarning(Warning):
pass
class UserFriendlyError(KotonebotError):
def __init__(
self,
message: str,
actions: list[tuple[int, str, Callable[[], None]]] = [],
*args, **kwargs
) -> None:
super().__init__(*args, **kwargs)
self.message = message
self.actions = actions or []
@property
def action_buttons(self) -> list[tuple[int, str]]:
"""
(id: int, btn_text: str) 的形式返回所有按钮定义
"""
return [(id, text) for id, text, _ in self.actions]
def invoke(self, action_id: int):
"""
执行指定 ID action
"""
for id, _, func in self.actions:
if id == action_id:
func()
break
else:
raise ValueError(f'Action with id {action_id} not found.')
class UnrecoverableError(KotonebotError):
pass
@ -31,7 +63,10 @@ class UnscalableResolutionError(KotonebotError):
self.screen_size = screen_size
super().__init__(f'Cannot scale to target resolution {target_resolution}. '
f'Screen size: {screen_size}')
class ContextNotInitializedError(KotonebotError):
def __init__(self, msg: str = 'Context not initialized'):
super().__init__(msg)
super().__init__(msg)
class StopCurrentTask(KotonebotError):
pass

View File

@ -0,0 +1,314 @@
import ctypes
from typing import Optional, Literal, List, overload
from typing_extensions import assert_never
# 按钮常量
MB_OK = 0x00000000
MB_OKCANCEL = 0x00000001
MB_ABORTRETRYIGNORE = 0x00000002
MB_YESNOCANCEL = 0x00000003
MB_YESNO = 0x00000004
MB_RETRYCANCEL = 0x00000005
MB_CANCELTRYCONTINUE = 0x00000006
# 图标常量
MB_ICONSTOP = 0x00000010
MB_ICONERROR = 0x00000010
MB_ICONQUESTION = 0x00000020
MB_ICONWARNING = 0x00000030
MB_ICONINFORMATION = 0x00000040
# 默认按钮常量
MB_DEFBUTTON1 = 0x00000000
MB_DEFBUTTON2 = 0x00000100
MB_DEFBUTTON3 = 0x00000200
MB_DEFBUTTON4 = 0x00000300
# 模态常量
MB_APPLMODAL = 0x00000000
MB_SYSTEMMODAL = 0x00001000
MB_TASKMODAL = 0x00002000
# 其他选项
MB_HELP = 0x00004000
MB_NOFOCUS = 0x00008000
MB_SETFOREGROUND = 0x00010000
MB_DEFAULT_DESKTOP_ONLY = 0x00020000
MB_TOPMOST = 0x00040000
MB_RIGHT = 0x00080000
MB_RTLREADING = 0x00100000
MB_SERVICE_NOTIFICATION = 0x00200000
# 返回值常量
IDOK = 1
IDCANCEL = 2
IDABORT = 3
IDRETRY = 4
IDIGNORE = 5
IDYES = 6
IDNO = 7
IDCLOSE = 8
IDHELP = 9
IDTRYAGAIN = 10
IDCONTINUE = 11
# 为清晰起见,定义类型别名
ButtonsType = Literal['ok', 'ok_cancel', 'abort_retry_ignore', 'yes_no_cancel', 'yes_no', 'retry_cancel', 'cancel_try_continue']
IconType = Optional[Literal['stop', 'error', 'question', 'warning', 'information']]
DefaultButtonType = Literal['button1', 'button2', 'button3', 'button4']
ModalType = Literal['application', 'system', 'task']
OptionsType = Optional[List[Literal['help', 'no_focus', 'set_foreground', 'default_desktop_only', 'topmost', 'right', 'rtl_reading', 'service_notification']]]
ReturnType = Literal['ok', 'cancel', 'abort', 'retry', 'ignore', 'yes', 'no', 'close', 'help', 'try_again', 'continue']
user32 = ctypes.windll.user32
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['ok'] = 'ok',
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['ok']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['ok_cancel'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['ok', 'cancel']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['abort_retry_ignore'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['abort', 'retry', 'ignore']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['yes_no_cancel'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['yes', 'no', 'cancel']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['yes_no'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['yes', 'no']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['retry_cancel'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['retry', 'cancel']: ...
@overload
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: Literal['cancel_try_continue'],
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> Literal['cancel', 'try_again', 'continue']: ...
def message_box(
hWnd: Optional[int],
text: str,
caption: str,
buttons: ButtonsType = 'ok',
icon: IconType = None,
default_button: DefaultButtonType = 'button1',
modal: ModalType = 'application',
options: OptionsType = None
) -> ReturnType:
"""
显示消息框
:param hWnd: 所属窗口的句柄可以为 None
:param text: 要显示的消息
:param caption: 消息框的标题
:param buttons: 要显示的按钮
:param icon: 要显示的图标
:param default_button: 默认按钮
:param modal: 消息框的模态
:param options: 其他杂项选项列表
:return: 表示用户点击的按钮的字符串
"""
uType = 0
# --- 按钮类型 ---
match buttons:
case 'ok':
uType |= MB_OK
case 'ok_cancel':
uType |= MB_OKCANCEL
case 'abort_retry_ignore':
uType |= MB_ABORTRETRYIGNORE
case 'yes_no_cancel':
uType |= MB_YESNOCANCEL
case 'yes_no':
uType |= MB_YESNO
case 'retry_cancel':
uType |= MB_RETRYCANCEL
case 'cancel_try_continue':
uType |= MB_CANCELTRYCONTINUE
case _:
assert_never(buttons)
# --- 图标类型 ---
if icon:
match icon:
case 'stop' | 'error':
uType |= MB_ICONSTOP
case 'question':
uType |= MB_ICONQUESTION
case 'warning':
uType |= MB_ICONWARNING
case 'information':
uType |= MB_ICONINFORMATION
case _:
assert_never(icon)
# --- 默认按钮 ---
match default_button:
case 'button1':
uType |= MB_DEFBUTTON1
case 'button2':
uType |= MB_DEFBUTTON2
case 'button3':
uType |= MB_DEFBUTTON3
case 'button4':
uType |= MB_DEFBUTTON4
case _:
assert_never(default_button)
# --- 模态 ---
match modal:
case 'application':
uType |= MB_APPLMODAL
case 'system':
uType |= MB_SYSTEMMODAL
case 'task':
uType |= MB_TASKMODAL
case _:
assert_never(modal)
# --- 其他选项 ---
if options:
for option in options:
match option:
case 'help':
uType |= MB_HELP
case 'no_focus':
uType |= MB_NOFOCUS
case 'set_foreground':
uType |= MB_SETFOREGROUND
case 'default_desktop_only':
uType |= MB_DEFAULT_DESKTOP_ONLY
case 'topmost':
uType |= MB_TOPMOST
case 'right':
uType |= MB_RIGHT
case 'rtl_reading':
uType |= MB_RTLREADING
case 'service_notification':
uType |= MB_SERVICE_NOTIFICATION
case _:
assert_never(option)
result = user32.MessageBoxW(hWnd, text, caption, uType)
match result:
case 1: # IDOK
return 'ok'
case 2: # IDCANCEL
return 'cancel'
case 3: # IDABORT
return 'abort'
case 4: # IDRETRY
return 'retry'
case 5: # IDIGNORE
return 'ignore'
case 6: # IDYES
return 'yes'
case 7: # IDNO
return 'no'
case 8: # IDCLOSE
return 'close'
case 9: # IDHELP
return 'help'
case 10: # IDTRYAGAIN
return 'try_again'
case 11: # IDCONTINUE
return 'continue'
case _:
# 对于标准消息框,不应发生这种情况
raise RuntimeError(f"Unknown MessageBox return code: {result}")
if __name__ == '__main__':
# 示例用法
response = message_box(
None,
"是否要退出程序?",
"确认",
buttons='yes_no',
icon='question'
)
if response == 'yes':
print("程序退出。")
else:
print("程序继续运行。")
message_box(
None,
"操作已完成。",
"通知",
buttons='ok',
icon='information'
)

View File

@ -0,0 +1,469 @@
import ctypes
from ctypes import wintypes
import time
from typing import List, Tuple, Optional
from typing import Literal
__all__ = [
"TaskDialog",
"TDCBF_OK_BUTTON", "TDCBF_YES_BUTTON", "TDCBF_NO_BUTTON", "TDCBF_CANCEL_BUTTON",
"TDCBF_RETRY_BUTTON", "TDCBF_CLOSE_BUTTON",
"IDOK", "IDCANCEL", "IDABORT", "IDRETRY", "IDIGNORE", "IDYES", "IDNO", "IDCLOSE",
"TD_WARNING_ICON", "TD_ERROR_ICON", "TD_INFORMATION_ICON", "TD_SHIELD_ICON"
]
# --- Windows API 常量定义 ---
# 常用按钮
TDCBF_OK_BUTTON = 0x0001
TDCBF_YES_BUTTON = 0x0002
TDCBF_NO_BUTTON = 0x0004
TDCBF_CANCEL_BUTTON = 0x0008
TDCBF_RETRY_BUTTON = 0x0010
TDCBF_CLOSE_BUTTON = 0x0020
# 对话框返回值
IDOK = 1
IDCANCEL = 2
IDABORT = 3
IDRETRY = 4
IDIGNORE = 5
IDYES = 6
IDNO = 7
IDCLOSE = 8
# 标准图标 (使用 MAKEINTRESOURCE 宏)
def MAKEINTRESOURCE(i: int) -> wintypes.LPWSTR:
return wintypes.LPWSTR(i)
TD_WARNING_ICON = MAKEINTRESOURCE(65535)
TD_ERROR_ICON = MAKEINTRESOURCE(65534)
TD_INFORMATION_ICON = MAKEINTRESOURCE(65533)
TD_SHIELD_ICON = MAKEINTRESOURCE(65532)
# Task Dialog 标志
TDF_ENABLE_HYPERLINKS = 0x0001
TDF_USE_HICON_MAIN = 0x0002
TDF_USE_HICON_FOOTER = 0x0004
TDF_ALLOW_DIALOG_CANCELLATION = 0x0008
TDF_USE_COMMAND_LINKS = 0x0010
TDF_USE_COMMAND_LINKS_NO_ICON = 0x0020
TDF_EXPAND_FOOTER_AREA = 0x0040
TDF_EXPANDED_BY_DEFAULT = 0x0080
TDF_VERIFICATION_FLAG_CHECKED = 0x0100
TDF_SHOW_PROGRESS_BAR = 0x0200
TDF_SHOW_MARQUEE_PROGRESS_BAR = 0x0400
TDF_CALLBACK_TIMER = 0x0800
TDF_POSITION_RELATIVE_TO_WINDOW = 0x1000
TDF_RTL_LAYOUT = 0x2000
TDF_NO_DEFAULT_RADIO_BUTTON = 0x4000
TDF_CAN_BE_MINIMIZED = 0x8000
# Task Dialog 通知
TDN_CREATED = 0
TDN_NAVIGATED = 1
TDN_BUTTON_CLICKED = 2
TDN_HYPERLINK_CLICKED = 3
TDN_TIMER = 4
TDN_DESTROYED = 5
TDN_RADIO_BUTTON_CLICKED = 6
TDN_DIALOG_CONSTRUCTED = 7
TDN_VERIFICATION_CLICKED = 8
TDN_HELP = 9
TDN_EXPANDO_BUTTON_CLICKED = 10
# Windows 消息
WM_USER = 0x0400
TDM_SET_PROGRESS_BAR_POS = WM_USER + 114
CommonButtonLiteral = Literal["ok", "yes", "no", "cancel", "retry", "close"]
IconLiteral = Literal["warning", "error", "information", "shield"]
# --- C 结构体定义 (使用 ctypes) ---
class TASKDIALOG_BUTTON(ctypes.Structure):
_pack_ = 1
_fields_ = [("nButtonID", ctypes.c_int),
("pszButtonText", wintypes.LPCWSTR)]
# 定义回调函数指针原型
PFTASKDIALOGCALLBACK = ctypes.WINFUNCTYPE(
ctypes.HRESULT, # 返回值
wintypes.HWND, # hwnd
ctypes.c_uint, # msg
ctypes.c_size_t, # wParam
ctypes.c_size_t, # lParam
ctypes.c_ssize_t # lpRefData
)
class TASKDIALOGCONFIG(ctypes.Structure):
_pack_ = 1
_fields_ = [
("cbSize", ctypes.c_uint),
("hwndParent", wintypes.HWND),
("hInstance", wintypes.HINSTANCE),
("dwFlags", ctypes.c_uint),
("dwCommonButtons", ctypes.c_uint),
("pszWindowTitle", wintypes.LPCWSTR),
("pszMainIcon", wintypes.LPCWSTR),
("pszMainInstruction", wintypes.LPCWSTR),
("pszContent", wintypes.LPCWSTR),
("cButtons", ctypes.c_uint),
("pButtons", ctypes.POINTER(TASKDIALOG_BUTTON)),
("nDefaultButton", ctypes.c_int),
("cRadioButtons", ctypes.c_uint),
("pRadioButtons", ctypes.POINTER(TASKDIALOG_BUTTON)),
("nDefaultRadioButton", ctypes.c_int),
("pszVerificationText", wintypes.LPCWSTR),
("pszExpandedInformation", wintypes.LPCWSTR),
("pszExpandedControlText", wintypes.LPCWSTR),
("pszCollapsedControlText", wintypes.LPCWSTR),
("pszFooterIcon", wintypes.LPCWSTR),
("pszFooter", wintypes.LPCWSTR),
("pfCallback", PFTASKDIALOGCALLBACK), # 使用定义好的原型
("lpCallbackData", ctypes.c_ssize_t),
("cxWidth", ctypes.c_uint)
]
# --- 加载 comctl32.dll 并定义函数原型 ---
comctl32 = ctypes.WinDLL('comctl32')
user32 = ctypes.WinDLL('user32')
TaskDialogIndirect = comctl32.TaskDialogIndirect
TaskDialogIndirect.restype = ctypes.HRESULT
TaskDialogIndirect.argtypes = [
ctypes.POINTER(TASKDIALOGCONFIG),
ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(wintypes.BOOL)
]
# --- Python 封装类 ---
class TaskDialog:
"""
一个用于显示 Windows TaskDialog Python 封装类
支持自定义按钮单选按钮进度条验证框等
"""
def __init__(self,
parent_hwnd: Optional[int] = None,
title: str = "Task Dialog",
main_instruction: str = "",
content: str = "",
common_buttons: int | List[CommonButtonLiteral] = TDCBF_OK_BUTTON,
main_icon: Optional[wintypes.LPWSTR | int | IconLiteral] = None,
footer: str = "",
custom_buttons: Optional[List[Tuple[int, str]]] = None,
default_button: int = 0,
radio_buttons: Optional[List[Tuple[int, str]]] = None,
default_radio_button: int = 0,
verification_text: Optional[str] = None,
verification_checked_by_default: bool = False,
show_progress_bar: bool = False,
show_marquee_progress_bar: bool = False
):
"""初始化 TaskDialog 实例。
:param parent_hwnd: 父窗口的句柄
:param title: 对话框窗口的标题
:param main_instruction: 对话框的主要指令文本
:param content: 对话框的详细内容文本
:param common_buttons: 要显示的通用按钮可以是以下两种形式之一
1. TDCBF_* 常量的按位或组合 (例如 TDCBF_OK_BUTTON | TDCBF_CANCEL_BUTTON)
2. 字符串列表支持 "ok", "yes", "no", "cancel", "retry", "close"
:param main_icon: 主图标可以是以下几种形式之一
1. TD_*_ICON 常量之一
2. HICON 句柄
3. 字符串"warning", "error", "information", "shield"
:param footer: 页脚区域显示的文本
:param custom_buttons: 自定义按钮列表每个元组包含 (按钮ID, 按钮文本)
:param default_button: 默认按钮的ID可以是通用按钮ID (例如 IDOK) 或自定义按钮ID
:param radio_buttons: 单选按钮列表每个元组包含 (按钮ID, 按钮文本)
:param default_radio_button: 默认选中的单选按钮的ID
:param verification_text: 验证复选框的文本如果为 None则不显示复选框
:param verification_checked_by_default: 验证复选框是否默认勾选
:param show_progress_bar: 是否显示标准进度条
:param show_marquee_progress_bar: 是否显示跑马灯式进度条
"""
self.config = TASKDIALOGCONFIG()
self.config.cbSize = ctypes.sizeof(TASKDIALOGCONFIG)
self.config.hwndParent = parent_hwnd
self.config.dwFlags = TDF_ALLOW_DIALOG_CANCELLATION | TDF_POSITION_RELATIVE_TO_WINDOW
self.config.dwCommonButtons = self._process_common_buttons(common_buttons)
self.config.pszWindowTitle = title
self.config.pszMainInstruction = main_instruction
self.config.pszContent = content
self.config.pszFooter = footer
self.progress: int = 0
if show_progress_bar or show_marquee_progress_bar:
# 进度条暂时还没实现
raise NotImplementedError("Progress bar is not implemented yet.")
self.config.dwFlags |= TDF_CALLBACK_TIMER
if show_progress_bar:
self.config.dwFlags |= TDF_SHOW_PROGRESS_BAR
else:
self.config.dwFlags |= TDF_SHOW_MARQUEE_PROGRESS_BAR
# 将实例方法转为 C 回调函数指针。
# 必须将其保存为实例成员,否则会被垃圾回收!
self._callback_func_ptr = PFTASKDIALOGCALLBACK(self._callback)
self.config.pfCallback = self._callback_func_ptr
# 将本实例的id作为lpCallbackData传递以便在回调中识别
self.config.lpCallbackData = id(self)
# --- 图标设置 ---
processed_icon = self._process_main_icon(main_icon)
if processed_icon is not None:
if isinstance(processed_icon, wintypes.LPWSTR):
self.config.pszMainIcon = processed_icon
else:
self.config.dwFlags |= TDF_USE_HICON_MAIN
self.config.hMainIcon = processed_icon
# --- 自定义按钮设置 ---
self.custom_buttons_list = []
if custom_buttons:
self.config.cButtons = len(custom_buttons)
button_array_type = TASKDIALOG_BUTTON * len(custom_buttons)
self.custom_buttons_list = button_array_type()
for i, (btn_id, btn_text) in enumerate(custom_buttons):
self.custom_buttons_list[i].nButtonID = btn_id
self.custom_buttons_list[i].pszButtonText = btn_text
self.config.pButtons = self.custom_buttons_list
if default_button:
self.config.nDefaultButton = default_button
# --- 单选按钮设置 ---
self.radio_buttons_list = []
if radio_buttons:
self.config.cRadioButtons = len(radio_buttons)
radio_array_type = TASKDIALOG_BUTTON * len(radio_buttons)
self.radio_buttons_list = radio_array_type()
for i, (btn_id, btn_text) in enumerate(radio_buttons):
self.radio_buttons_list[i].nButtonID = btn_id
self.radio_buttons_list[i].pszButtonText = btn_text
self.config.pRadioButtons = self.radio_buttons_list
if default_radio_button:
self.config.nDefaultRadioButton = default_radio_button
# --- 验证复选框设置 ---
if verification_text:
self.config.pszVerificationText = verification_text
if verification_checked_by_default:
self.config.dwFlags |= TDF_VERIFICATION_FLAG_CHECKED
def _process_common_buttons(self, common_buttons: int | List[CommonButtonLiteral]) -> int:
"""处理 common_buttons 参数,支持常量和字符串列表两种形式"""
if isinstance(common_buttons, int):
# 直接使用 Win32 常量
return common_buttons
elif isinstance(common_buttons, list):
# 处理字符串列表
result = 0
for button in common_buttons:
# 使用 match 和 assert_never 进行类型检查
match button:
case "ok":
result |= TDCBF_OK_BUTTON
case "yes":
result |= TDCBF_YES_BUTTON
case "no":
result |= TDCBF_NO_BUTTON
case "cancel":
result |= TDCBF_CANCEL_BUTTON
case "retry":
result |= TDCBF_RETRY_BUTTON
case "close":
result |= TDCBF_CLOSE_BUTTON
case _:
# 这在实际中不会发生,因为类型检查会阻止它
from typing import assert_never
assert_never(button)
return result
else:
raise TypeError("common_buttons must be either an int or a list of strings")
def _process_main_icon(self, main_icon: Optional[wintypes.LPWSTR | int | IconLiteral]) -> Optional[wintypes.LPWSTR | int]:
"""处理 main_icon 参数,支持常量和字符串两种形式"""
if main_icon is None:
return None
elif isinstance(main_icon, (wintypes.LPWSTR, int)):
# 直接使用 Win32 常量或 HICON 句柄
return main_icon
elif isinstance(main_icon, str):
# 处理字符串
match main_icon:
case "warning":
return TD_WARNING_ICON
case "error":
return TD_ERROR_ICON
case "information":
return TD_INFORMATION_ICON
case "shield":
return TD_SHIELD_ICON
case _:
# 这在实际中不会发生,因为类型检查会阻止它
from typing import assert_never
assert_never(main_icon)
else:
raise TypeError("main_icon must be None, a Windows constant, or a string")
def _callback(self, hwnd: wintypes.HWND, msg: int, wParam: int, lParam: int, lpRefData: int) -> int:
# 仅当 lpRefData 指向的是当前这个对象实例时才处理
if lpRefData != id(self):
return 0 # S_OK
if msg == TDN_TIMER:
# 更新进度条
if self.progress < 100:
self.progress += 5
# 发送消息给对话框来更新进度条位置
user32.SendMessageW(hwnd, TDM_SET_PROGRESS_BAR_POS, self.progress, 0)
else:
# 示例进度达到100%后可以模拟点击OK按钮关闭对话框
# from ctypes import wintypes
# user32.PostMessageW(hwnd, wintypes.UINT(1125), IDOK, 0) # TDM_CLICK_BUTTON
pass
elif msg == TDN_DESTROYED:
# 对话框已销毁
pass
return 0 # S_OK
def show(self) -> Tuple[int, int, bool]:
"""
显示对话框并返回用户交互的结果
:return: 一个元组 (button_id, radio_button_id, verification_checked)
- button_id: 用户点击的按钮ID (例如 IDOK, IDCANCEL)
- radio_button_id: 用户选择的单选按钮的ID
- verification_checked: 验证复选框是否被勾选 (True/False)
"""
pnButton = ctypes.c_int(0)
pnRadioButton = ctypes.c_int(0)
pfVerificationFlagChecked = wintypes.BOOL(False)
hr = TaskDialogIndirect(
ctypes.byref(self.config),
ctypes.byref(pnButton),
ctypes.byref(pnRadioButton),
ctypes.byref(pfVerificationFlagChecked)
)
if hr == 0: # S_OK
return pnButton.value, pnRadioButton.value, bool(pfVerificationFlagChecked.value)
else:
raise ctypes.WinError(hr)
# --- 示例用法 ---
if __name__ == '__main__':
print("--- 示例 1: 简单信息框 ---")
dlg_simple = TaskDialog(
title="操作成功",
main_instruction="您的操作已成功完成。",
content="文件已保存到您的文档目录。",
common_buttons=["ok"],
main_icon="information"
)
result_simple, _, _ = dlg_simple.show()
print(f"用户点击了按钮: {result_simple} (1=OK)\n")
print("--- 示例 2: 确认框 ---")
dlg_confirm = TaskDialog(
title="确认删除",
main_instruction="您确定要永久删除这个文件吗?",
content="这个操作无法撤销。文件将被立即删除。",
common_buttons=["yes", "no", "cancel"],
main_icon="warning",
default_button=IDNO
)
result_confirm, _, _ = dlg_confirm.show()
if result_confirm == IDYES:
print("用户选择了“是”。")
elif result_confirm == IDNO:
print("用户选择了“否”。")
elif result_confirm == IDCANCEL:
print("用户选择了“取消”。")
print(f"返回的按钮ID: {result_confirm}\n")
# 示例 3
print("--- 示例 3: 自定义按钮 ---")
CUSTOM_BUTTON_SAVE_ID = 101
CUSTOM_BUTTON_DONT_SAVE_ID = 102
my_buttons = [
(CUSTOM_BUTTON_SAVE_ID, "保存并退出"),
(CUSTOM_BUTTON_DONT_SAVE_ID, "不保存直接退出")
]
dlg_custom = TaskDialog(
title="未保存的更改",
main_instruction="文档中有未保存的更改,您想如何处理?",
custom_buttons=my_buttons,
common_buttons=["cancel"],
main_icon="warning",
footer="这是一个重要的提醒!"
)
result_custom, _, _ = dlg_custom.show()
if result_custom == CUSTOM_BUTTON_SAVE_ID:
print("用户选择了“保存并退出”。")
elif result_custom == CUSTOM_BUTTON_DONT_SAVE_ID:
print("用户选择了“不保存直接退出”。")
elif result_custom == IDCANCEL:
print("用户选择了“取消”。")
print(f"返回的按钮ID: {result_custom}\n")
# 示例 4: 带单选按钮和验证框的对话框
print("--- 示例 4: 单选按钮和验证框 ---")
RADIO_BTN_WORD_ID = 201
RADIO_BTN_EXCEL_ID = 202
RADIO_BTN_PDF_ID = 203
radio_buttons = [
(RADIO_BTN_WORD_ID, "保存为 Word 文档 (.docx)"),
(RADIO_BTN_EXCEL_ID, "保存为 Excel 表格 (.xlsx)"),
(RADIO_BTN_PDF_ID, "导出为 PDF 文档 (.pdf)")
]
dlg_radio = TaskDialog(
title="选择导出格式",
main_instruction="请选择您想要导出的文件格式。",
content="选择一个格式后,点击“确定”继续。",
common_buttons=["ok", "cancel"],
main_icon="information",
radio_buttons=radio_buttons,
default_radio_button=RADIO_BTN_PDF_ID, # 默认选中PDF
verification_text="设为我的默认导出选项",
verification_checked_by_default=True
)
btn_id, radio_id, checked = dlg_radio.show()
if btn_id == IDOK:
print(f"用户点击了“确定”。")
if radio_id == RADIO_BTN_WORD_ID:
print("选择了导出为 Word。")
elif radio_id == RADIO_BTN_EXCEL_ID:
print("选择了导出为 Excel。")
elif radio_id == RADIO_BTN_PDF_ID:
print("选择了导出为 PDF。")
if checked:
print("用户勾选了“设为我的默认导出选项”。")
else:
print("用户未勾选“设为我的默认导出选项”。")
else:
print("用户点击了“取消”。")
print(f"返回的按钮ID: {btn_id}, 单选按钮ID: {radio_id}, 验证框状态: {checked}\n")

View File

@ -4,7 +4,9 @@ import uuid
import re
import logging
from typing import Literal
from pydantic import BaseModel, ConfigDict, field_serializer, field_validator
from pydantic import BaseModel, ConfigDict, ValidationError, field_serializer, field_validator
from kotonebot.kaa.errors import ProduceSolutionInvalidError, ProduceSolutionNotFoundError
from .const import ProduceAction, RecommendCardDetectionMode
@ -217,17 +219,17 @@ class ProduceSolutionManager:
:param id: 方案ID
:return: 方案对象
:raises FileNotFoundError: 当方案不存在时
:raises ProduceSloutionNotFoundError: 当方案不存在时
"""
file_path = self._find_file_path_by_id(id)
if not file_path:
raise FileNotFoundError(f"Solution with id '{id}' not found")
raise ProduceSolutionNotFoundError(id)
try:
with open(file_path, 'r', encoding='utf-8') as f:
return ProduceSolution.model_validate_json(f.read())
except Exception as e:
raise FileNotFoundError(f"Failed to read solution with id '{id}': {e}")
except ValidationError as e:
raise ProduceSolutionInvalidError(id, file_path, e)
def duplicate(self, id: str) -> ProduceSolution:
"""
@ -235,7 +237,7 @@ class ProduceSolutionManager:
:param id: 要复制的方案ID
:return: 新的方案对象具有新的ID和名称
:raises FileNotFoundError: 当原方案不存在时
:raises ProduceSolutionNotFoundError: 当原方案不存在时
"""
original = self.read(id)

View File

@ -64,6 +64,8 @@ class ContestConfig(ConfigBaseModel):
select_which_contestant: Literal[1, 2, 3] = 1
"""选择第几个挑战者"""
when_no_set: Literal['remind', 'wait', 'auto_set', 'auto_set_silent'] = 'remind'
"""竞赛队伍未编成时应该remind=通知我并跳过竞赛wait=提醒我并等待手动编成auto_set=使用自动编成并提醒auto_set_silent=使用自动编成不提醒"""
class ProduceConfig(ConfigBaseModel):

38
kotonebot/kaa/errors.py Normal file
View File

@ -0,0 +1,38 @@
import os
from kotonebot.errors import UserFriendlyError
class KaaError(Exception):
pass
class KaaUserFriendlyError(UserFriendlyError, KaaError):
def __init__(self, message: str, help_link: str):
super().__init__(message, [
(0, '打开帮助', lambda: os.startfile(help_link)),
(1, '知道了', lambda: None)
])
class ProduceSolutionNotFoundError(KaaUserFriendlyError):
def __init__(self, solution_id: str):
self.solution_id = solution_id
super().__init__(
f'培育方案「{solution_id}」不存在,请检查设置是否正确。',
'https://kdocs.cn/l/cetCY8mGKHLj?linkname=saPrDAmMd4'
)
class ProduceSolutionInvalidError(KaaUserFriendlyError):
def __init__(self, solution_id: str, file_path: str, reason: Exception):
self.solution_id = solution_id
self.reason = reason
super().__init__(
f'培育方案「{solution_id}」(路径 {file_path})存在无效配置,载入失败。',
'https://kdocs.cn/l/cetCY8mGKHLj?linkname=xnLUW1YYKz'
)
class IdolCardNotFoundError(KaaUserFriendlyError):
def __init__(self, skin_id: str):
self.skin_id = skin_id
super().__init__(
f'未找到 ID 为「{skin_id}」的偶像卡。请检查游戏内偶像皮肤与培育方案中偶像皮肤是否一致。',
'https://kdocs.cn/l/cetCY8mGKHLj?linkname=cySASqoPGj'
)

View File

@ -14,6 +14,7 @@ from typing import List, Dict, Tuple, Literal, Generator, Callable, Any, get_arg
import cv2
import gradio as gr
from kotonebot.kaa.errors import ProduceSolutionNotFoundError
from kotonebot.kaa.main import Kaa
from kotonebot.kaa.db import IdolCard
from kotonebot.backend.context.context import vars
@ -51,7 +52,7 @@ ConfigKey = Literal[
'mini_live_reassign', 'mini_live_duration',
'online_live_reassign', 'online_live_duration',
'contest_enabled',
'select_which_contestant',
'select_which_contestant', 'when_no_set',
# produce
'produce_enabled', 'selected_solution_id', 'produce_count',
@ -1250,6 +1251,20 @@ class KotoneBotUI:
interactive=True,
info=ContestConfig.model_fields['select_which_contestant'].description
)
when_no_set_choices = [
("通知我并跳过竞赛", "remind"),
("提醒我并等待手动编成", "wait"),
("使用自动编成并提醒我", "auto_set"),
("使用自动编成", "auto_set_silent")
]
when_no_set = gr.Dropdown(
choices=when_no_set_choices,
value=self.current_config.options.contest.when_no_set,
label="竞赛队伍未编成时",
interactive=True,
info=ContestConfig.model_fields['when_no_set'].description
)
contest_enabled.change(
fn=lambda x: gr.Group(visible=x),
inputs=[contest_enabled],
@ -1259,10 +1274,12 @@ class KotoneBotUI:
def set_config(config: BaseConfig, data: dict[ConfigKey, Any]) -> None:
config.contest.enabled = data['contest_enabled']
config.contest.select_which_contestant = data['select_which_contestant']
config.contest.when_no_set = data['when_no_set']
return set_config, {
'contest_enabled': contest_enabled,
'select_which_contestant': select_which_contestant
'select_which_contestant': select_which_contestant,
'when_no_set': when_no_set
}
def _create_produce_settings(self) -> ConfigBuilderReturnValue:
@ -1353,7 +1370,7 @@ class KotoneBotUI:
if selected_solution_id:
try:
current_solution = solution_manager.read(selected_solution_id)
except FileNotFoundError:
except ProduceSolutionNotFoundError:
pass
if current_solution is None:
@ -1675,7 +1692,7 @@ class KotoneBotUI:
gr.Checkbox(value=solution.data.skip_commu, visible=True),
gr.Button(visible=True), # save_solution_btn
]
except FileNotFoundError:
except ProduceSolutionNotFoundError:
gr.Warning(f"培育方案 {solution_id} 不存在")
return on_solution_change(None)
except Exception as e:

View File

@ -2,12 +2,15 @@
import logging
from gettext import gettext as _
from kotonebot.errors import StopCurrentTask
from kotonebot.kaa.tasks import R
from kotonebot.kaa.config import conf
from kotonebot.kaa.game_ui import WhiteFilter
from kotonebot.kaa.game_ui import WhiteFilter, dialog
from ..actions.scenes import at_home, goto_home
from ..actions.loading import wait_loading_end
from kotonebot import device, image, ocr, color, action, task, user, rect_expand, sleep, contains, Interval
from kotonebot import device, image, ocr, color, action, task, rect_expand, sleep, contains, Interval
from kotonebot.backend.context.context import vars
from kotonebot.ui import user as ui_user
logger = logging.getLogger(__name__)
@ -70,11 +73,39 @@ def handle_challenge() -> bool:
# 记忆未编成 [screenshots/contest/no_memo.png]
if image.find(R.Daily.TextContestNoMemory):
logger.debug('Memory not set. Using auto-compilation.')
user.warning('竞赛未编成', _('记忆未编成。将使用自动编成。'), once=True)
if image.find(R.Daily.ButtonContestChallenge):
device.click()
return True
logger.debug('Memory not set.')
when_no_set = conf().contest.when_no_set
auto_compilation = False
match when_no_set:
case 'remind':
# 关闭编成提示弹窗
dialog.expect_no(msg='Closed memory not set dialog.')
ui_user.warning('竞赛未编成', '已跳过此次竞赛任务。')
logger.info('Contest skipped due to memory not set (remind mode).')
raise StopCurrentTask
case 'wait':
dialog.expect_no(msg='Closed memory not set dialog.')
ui_user.warning('竞赛未编成', '已自动暂停,请手动编成后返回至挑战开始页,并点击网页上「恢复」按钮或使用快捷键继续执行。')
vars.flow.request_pause(wait_resume=True)
logger.info('Contest paused due to memory not set (wait mode).')
return True
case 'auto_set' | 'auto_set_silent':
if when_no_set == 'auto_set':
ui_user.warning('竞赛未编成', '将使用自动编成。', once=True)
logger.debug('Using auto-compilation with notification.')
else: # auto_set_silent
logger.debug('Using auto-compilation silently.')
auto_compilation = True
case _:
logger.warning(f'Unknown value for contest.when_no_set: {when_no_set}, fallback to auto.')
logger.debug('Using auto-compilation silently.')
auto_compilation = True
if auto_compilation:
if image.find(R.Daily.ButtonContestChallenge):
device.click()
return True
# 勾选跳过所有
# [screenshots/contest/contest2.png]
@ -85,7 +116,7 @@ def handle_challenge() -> bool:
# 跳过所有
# [screenshots/contest/contest1.png]
if image.find(R.Daily.ButtonIconSkip, preprocessors=[WhiteFilter()]):
if image.find(R.Daily.ButtonIconSkip, preprocessors=[WhiteFilter()], threshold=0.7):
logger.debug('Skipping all.')
device.click()
return True

View File

@ -8,7 +8,6 @@ from kotonebot.kaa.config import conf, DailyMoneyShopItems
from kotonebot.primitives.geometry import Point
from kotonebot.util import Countdown, cropped
from kotonebot import task, device, image, action, sleep
from kotonebot.backend.dispatch import SimpleDispatcher
from ..actions.scenes import goto_home, goto_shop, at_daily_shop
logger = logging.getLogger(__name__)

View File

@ -5,6 +5,7 @@ import logging
import _thread
import threading
from kotonebot.backend.bot import PostTaskContext
from kotonebot.ui import user
from ..kaa_context import instance
from kotonebot.kaa.config import Priority, conf
@ -35,8 +36,8 @@ def windows_close():
os.system('taskkill /f /im gakumas.exe')
logger.info("Game closed successfully")
@task('关闭游戏', priority=Priority.END_GAME)
def end_game():
@task('关闭游戏', priority=Priority.END_GAME, run_at='post')
def end_game(ctx: PostTaskContext):
"""
游戏结束时执行的任务
"""
@ -101,4 +102,4 @@ if __name__ == '__main__':
conf().end_game.kill_game = True
conf().end_game.kill_dmm = True
conf().end_game.kill_emulator = True
end_game()
end_game(PostTaskContext(False, None))

View File

@ -12,9 +12,8 @@ from .cards import do_cards, CardDetectResult
from ..actions.commu import handle_unread_commu
from kotonebot.errors import UnrecoverableError
from kotonebot.util import Countdown, Interval, cropped
from kotonebot.backend.dispatch import DispatcherContext
from kotonebot.backend.loop import Loop
from kotonebot.kaa.config import ProduceAction, RecommendCardDetectionMode
from kotonebot.kaa.config import conf
from ..produce.common import until_acquisition_clear, commu_event, fast_acquisitions
from kotonebot import ocr, device, contains, image, regex, action, sleep, wait
from ..produce.non_lesson_actions import (
@ -705,8 +704,8 @@ ProduceStage = Literal[
'unknown', # 未知场景
]
@action('检测当前培育场景', dispatcher=True)
def detect_produce_scene(ctx: DispatcherContext) -> ProduceStage:
@action('检测当前培育场景')
def detect_produce_scene() -> ProduceStage:
"""
判断当前是培育的什么阶段并开始 Regular 培育
@ -715,31 +714,35 @@ def detect_produce_scene(ctx: DispatcherContext) -> ProduceStage:
"""
logger.info("Detecting current produce stage...")
# 行动场景
texts = ocr.ocr()
if (
image.find_multi([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
])
):
logger.info("Detection result: At action scene.")
ctx.finish()
return 'action'
elif texts.where(regex('CLEARまで|PERFECTまで')):
logger.info("Detection result: At practice ongoing.")
ctx.finish()
return 'practice-ongoing'
elif is_exam_scene():
logger.info("Detection result: At exam scene.")
ctx.finish()
return 'exam-ongoing'
else:
if fast_acquisitions():
return 'unknown'
if commu_event():
return 'unknown'
return 'unknown'
for _ in Loop():
# 行动场景
texts = ocr.ocr()
if (
image.find_multi([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
])
):
logger.info("Detection result: At action scene.")
return 'action'
elif texts.where(regex('CLEARまで|PERFECTまで')):
logger.info("Detection result: At practice ongoing.")
return 'practice-ongoing'
elif is_exam_scene():
logger.info("Detection result: At exam scene.")
return 'exam-ongoing'
else:
if fast_acquisitions():
# 继续循环检测
pass
elif commu_event():
# 继续循环检测
pass
else:
return 'unknown'
# 如果没有返回,说明需要继续检测
sleep(0.5) # 等待一段时间再重新检测
return 'unknown'
@action('开始 Hajime 培育')
def hajime_from_stage(stage: ProduceStage, type: Literal['regular', 'pro', 'master'], week: int):

View File

@ -15,6 +15,7 @@ from kotonebot.kaa.game_ui.idols_overview import locate_idol, match_idol
from ..produce.in_purodyuusu import hajime_pro, hajime_regular, hajime_master, resume_pro_produce, resume_regular_produce, \
resume_master_produce
from kotonebot import device, image, ocr, task, action, sleep, contains, regex
from kotonebot.kaa.errors import IdolCardNotFoundError
logger = logging.getLogger(__name__)
@ -58,7 +59,7 @@ def select_idol(skin_id: str):
# 选择偶像
pos = locate_idol(skin_id)
if pos is None:
raise ValueError(f"Idol {skin_id} not found.")
raise IdolCardNotFoundError(skin_id)
# 确认
it.reset()
while btn_confirm := image.find(R.Common.ButtonConfirmNoIcon):

View File

@ -12,6 +12,7 @@ from .actions.scenes import at_home, goto_home
from .actions.commu import handle_unread_commu
from kotonebot.errors import GameUpdateNeededError
from kotonebot import task, action, sleep, device, image, ocr, config
from kotonebot.backend.context.context import vars
logger = logging.getLogger(__name__)
@ -170,6 +171,7 @@ def windows_launch():
# 等待游戏窗口出现
it = Interval()
while True:
vars.flow.check()
if ahk.find_window(title='gakumas', title_match_mode=3):
logger.debug('Game window found.')
break

View File

@ -4,6 +4,7 @@ import time
import cv2
from cv2.typing import MatLike
from win11toast import toast
from .pushkit import Wxpusher
from .. import logging
@ -25,17 +26,6 @@ def retry(func):
continue
return wrapper
def ask(
question: str,
options: list[str],
*,
timeout: float = -1,
) -> bool:
"""
询问用户
"""
raise NotImplementedError
def _save_local(
title: str,
message: str,
@ -74,6 +64,43 @@ def push(
logger.warning('push remote message failed: %s', e)
_save_local(title, message, images)
def _show_toast(title: str, message: str | None = None, buttons: list[str] | None = None):
"""
统一的 Toast 通知函数
:param title: 通知标题
:param message: 通知消息内容
:param buttons: 按钮列表如果提供则显示带按钮的通知
"""
try:
if buttons:
logger.verbose('showing toast notification with buttons: %s - %s', title, message or '')
toast(title, message or '', buttons=buttons)
else:
# 如果没有 message只显示 title
if message:
logger.verbose('showing toast notification: %s - %s', title, message)
toast(title, message)
else:
logger.verbose('showing toast notification: %s', title)
toast(title)
except Exception as e:
logger.warning('toast notification failed: %s', e)
def ask(
question: str,
options: list[tuple[str, str]],
*,
timeout: float = -1,
) -> str:
"""
询问用户
"""
# 将选项转换为按钮列表
buttons = [option[1] for option in options]
_show_toast("琴音小助手询问", question, buttons=buttons)
raise NotImplementedError
def info(
title: str,
message: str | None = None,
@ -83,6 +110,7 @@ def info(
):
logger.info('user.info: %s', message)
push('KAA' + title, message, images=images)
_show_toast('KAA' + title, message)
def warning(
title: str,
@ -98,7 +126,8 @@ def warning(
:param once: 每次运行是否只显示一次
"""
logger.warning('user.warning: %s', message)
push("KAA 警告:" + title, message, images=images)
push("琴音小助手警告:" + title, message, images=images)
_show_toast("琴音小助手警告:" + title, message)
def error(
title: str,
@ -111,4 +140,5 @@ def error(
错误信息
"""
logger.error('user.error: %s', message)
push("KAA 错误:" + title, message, images=images)
push("琴音小助手错误:" + title, message, images=images)
_show_toast("琴音小助手错误:" + title, message)

View File

@ -41,6 +41,7 @@ dependencies = [
# TODO: move these dependencies to optional-dependencies
"pywin32==310",
"ahk==1.8.3",
"win11toast==0.35", # For Windows Toast Notification
]

View File

@ -1,4 +1,5 @@
-r requirements.txt
pywin32==310
ahk==1.8.3
ahk==1.8.3
win11toast==0.35

View File

@ -12,6 +12,7 @@ from kotonebot.kaa.config.produce import (
ProduceSolutionManager
)
from kotonebot.kaa.config.const import ProduceAction, RecommendCardDetectionMode
from kotonebot.kaa.errors import ProduceSolutionNotFoundError
class TestProduceData(TestCase):
@ -360,7 +361,7 @@ class TestProduceSolutionManager(TestCase):
def test_read_nonexistent_solution(self):
"""测试读取不存在的方案"""
with self.assertRaises(FileNotFoundError) as context:
with self.assertRaises(ProduceSolutionNotFoundError) as context:
self.manager.read('nonexistent_id')
self.assertIn("Solution with id 'nonexistent_id' not found", str(context.exception))
@ -429,7 +430,7 @@ class TestProduceSolutionManager(TestCase):
def test_duplicate_nonexistent_solution(self):
"""测试复制不存在的方案"""
with self.assertRaises(FileNotFoundError):
with self.assertRaises(ProduceSolutionNotFoundError):
self.manager.duplicate('nonexistent_id')
def test_corrupted_json_handling(self):