feat(core): 新增全局暂停脚本执行功能

This commit is contained in:
XcantloadX 2025-06-08 10:32:53 +08:00
parent 2452c8cc09
commit a8e1f0d37f
7 changed files with 375 additions and 59 deletions

View File

@ -32,7 +32,7 @@ class RunStatus:
callstack: list[Task | Action] = field(default_factory=list)
def interrupt(self):
vars.interrupted.set()
vars.flow.request_interrupt()
# Modified from https://stackoverflow.com/questions/70982565/how-do-i-make-an-event-listener-with-decorators-in-python
Params = ParamSpec('Params')
@ -158,7 +158,7 @@ class KotoneBot:
d = self._on_create_device()
init_context(config_path=self.config_path, config_type=self.config_type, target_device=d)
self._on_after_init_context()
vars.interrupted.clear()
vars.flow.clear_interrupt()
if by_priority:
tasks = sorted(tasks, key=lambda x: x.priority, reverse=True)
@ -180,7 +180,7 @@ class KotoneBot:
logger.exception('Keyboard interrupt detected.')
for task1 in tasks[tasks.index(task):]:
self.events.task_status_changed.trigger(task1, 'cancelled')
vars.interrupted.clear()
vars.flow.clear_interrupt()
break
# 其他错误
except Exception as e:

View File

@ -3,6 +3,7 @@ import re
import time
import logging
import warnings
import threading
from datetime import datetime
from threading import Event
from typing import (
@ -25,6 +26,7 @@ import cv2
from cv2.typing import MatLike
from kotonebot.client.device import Device
from kotonebot.backend.flow_controller import FlowController
import kotonebot.backend.image as raw_image
from kotonebot.backend.image import (
TemplateMatchResult,
@ -126,8 +128,7 @@ def interruptible(func: Callable[P, T]) -> Callable[P, T]:
"""
def _decorator(*args: P.args, **kwargs: P.kwargs) -> T:
global vars
if vars.interrupted.is_set():
raise KeyboardInterrupt("User requested interrupt.")
vars.flow.check()
return func(*args, **kwargs)
return _decorator
@ -145,15 +146,13 @@ def interruptible_class(cls: Type[T]) -> Type[T]:
def sleep(seconds: float, /):
"""
可中断sleep 函数
可中断和可暂停sleep 函数
建议使用本函数代替 `time.sleep()`
这样能以最快速度响应用户请求中断
这样能以最快速度响应用户请求中断和暂停
"""
global vars
vars.interrupted.wait(timeout=seconds)
if vars.interrupted.is_set():
raise KeyboardInterrupt("User requested interrupt.")
vars.flow.sleep(seconds)
def warn_manual_screenshot_mode(name: str, alternative: str):
"""
@ -175,8 +174,8 @@ def is_manual_screenshot_mode() -> bool:
class ContextGlobalVars:
def __init__(self):
self.__vars = dict[str, Any]()
self.interrupted: Event = Event()
"""用户请求中断事件"""
self.flow: FlowController = FlowController()
"""流程控制器,负责停止、暂停、恢复等操作"""
def __getitem__(self, key: str) -> Any:
return self.__vars[key]
@ -198,6 +197,17 @@ class ContextGlobalVars:
def clear(self):
self.__vars.clear()
self.flow.reset() # 重置流程控制器
def check_flow_control():
"""
统一的流程控制检查函数
检查用户是否请求中断或暂停如果是则相应处理
- 如果请求中断抛出 KeyboardInterrupt 异常
- 如果请求暂停等待直到恢复
"""
vars.flow.check()
class ContextStackVars:
stack: list['ContextStackVars'] = []
@ -716,6 +726,7 @@ class ContextDevice(Device):
"""
截图返回截图数据同时更新当前上下文的截图数据
"""
check_flow_control()
global next_wait, last_screenshot_time, next_wait_time
current = ContextStackVars.ensure_current()
if force:
@ -950,5 +961,4 @@ def manual_context(screenshot_mode: ScreenshotMode = 'auto') -> ManualContextMan
默认情况下Context* 类仅允许在 @task/@action 函数中使用
如果想要在其他地方使用使用此函数手动创建一个上下文
"""
return ManualContextManager(screenshot_mode)
return ManualContextManager(screenshot_mode)

View File

@ -115,7 +115,7 @@ async def run_code(request: RunCodeRequest):
except KeyboardInterrupt as e:
result = {"status": "error", "result": stdout.getvalue(), "message": str(e), "traceback": traceback.format_exc()}
finally:
context_vars.interrupted.clear()
context_vars.flow.clear_interrupt()
event.set()
threading.Thread(target=_runner, daemon=True).start()
await event.wait()
@ -124,8 +124,8 @@ async def run_code(request: RunCodeRequest):
@app.get("/api/code/stop")
async def stop_code():
from kotonebot.backend.context import vars
vars.interrupted.set()
while vars.interrupted.is_set():
vars.flow.request_interrupt()
while vars.flow.is_interrupted:
await asyncio.sleep(0.1)
return {"status": "ok"}

View File

@ -0,0 +1,195 @@
import time
import logging
import threading
from typing import Literal
logger = logging.getLogger(__name__)
class FlowController:
"""
一个用于控制任务执行流程如停止暂停恢复的类
这个类是线程安全的提供了以下功能
* 停止任务执行通过中断信号
* 暂停/恢复任务执行
* 可中断和可暂停的 sleep 功能
* 流程状态检查
使用方法::
controller = FlowController()
# 在任务的关键路径上调用检查
controller.check()
# 使用可控制的 sleep
controller.sleep(1.0)
# 外部控制
controller.request_pause() # 暂停
controller.request_resume() # 恢复
controller.request_stop() # 停止
"""
def __init__(self):
self.interrupt_event: threading.Event = threading.Event()
"""中断事件,用于停止任务"""
self.paused: bool = False
"""暂停标志"""
self.pause_condition: threading.Condition = threading.Condition()
"""暂停条件变量,用于线程间同步"""
def check(self) -> None:
"""
检查当前流程状态
如果收到停止请求则抛出 KeyboardInterrupt 异常
如果收到暂停请求则阻塞直到恢复
这是核心的检查点方法应在任务的关键路径上如循环或等待前调用
:raises KeyboardInterrupt: 当收到停止请求时
"""
# 优先检查中断信号
if self.interrupt_event.is_set():
raise KeyboardInterrupt("User requested interrupt.")
# 检查暂停状态
with self.pause_condition:
while self.paused:
self.pause_condition.wait()
def sleep(self, seconds: float) -> None:
"""
一个可被中断和暂停的 sleep 方法
与标准的 time.sleep() 不同这个方法会响应停止和暂停请求
在暂停状态下计时器会暂停恢复后继续计时
:param seconds: 睡眠时间
:raises KeyboardInterrupt: 当收到停止请求时
"""
with self.pause_condition:
end_time = time.time() + seconds
while True:
self.check() # 每次循环都检查状态
remaining = end_time - time.time()
if remaining <= 0:
break
# 等待指定时间或直到被唤醒
self.pause_condition.wait(timeout=remaining)
# 结束后再次检查状态
self.check()
def request_interrupt(self) -> None:
"""
请求停止任务
设置中断信号所有正在执行的任务将在下一个检查点停止
停止的优先级高于暂停
"""
logger.info('Interrupt requested.')
self.interrupt_event.set()
def request_pause(self) -> None:
"""
请求暂停任务
设置暂停标志所有正在执行的任务将在下一个检查点暂停
如果任务已经暂停此操作无效果
"""
with self.pause_condition:
if not self.paused:
logger.info('Pause requested.')
self.paused = True
def request_resume(self) -> None:
"""
请求恢复任务
清除暂停标志并通知所有等待的线程恢复执行
如果任务没有暂停此操作无效果
"""
with self.pause_condition:
if self.paused:
logger.info('Resume requested.')
self.paused = False
self.pause_condition.notify_all()
def toggle_pause(self) -> bool:
"""
切换暂停/恢复状态
:returns: 操作后的暂停状态True 表示已暂停False 表示已恢复
"""
with self.pause_condition:
logger.info('Pause toggled.')
if self.paused:
self.paused = False
self.pause_condition.notify_all()
return False
else:
self.paused = True
return True
def clear_interrupt(self) -> None:
"""
清除中断信号
用于任务正常结束或重启时重置状态
通常在开始新任务前调用
"""
self.interrupt_event.clear()
logger.info('Interrupt cleared.')
def reset(self) -> None:
"""
重置流程控制器到初始状态
清除所有信号和状态相当于重新创建一个新的控制器
"""
self.interrupt_event.clear()
with self.pause_condition:
if self.paused:
self.paused = False
self.pause_condition.notify_all()
logger.info('FlowController reset.')
@property
def is_interrupted(self) -> bool:
"""
检查是否收到中断请求
:returns: True 表示已收到中断请求
"""
return self.interrupt_event.is_set()
@property
def is_paused(self) -> bool:
"""
检查是否处于暂停状态
:returns: True 表示当前处于暂停状态
"""
return self.paused
@property
def status(self) -> Literal['running', 'paused', 'interrupted']:
"""
获取当前状态的字符串描述
:returns: 状态描述可能的值'running', 'paused', 'interrupted'
"""
if self.is_interrupted:
return 'interrupted'
elif self.is_paused:
return 'paused'
else:
return 'running'
def __repr__(self) -> str:
return f"FlowController(status='{self.status}')"

View File

@ -7,7 +7,7 @@ import cv2
import win32ui
import win32gui
import numpy as np
from ahk import AHK
from ahk import AHK, MsgBoxIcon
from cv2.typing import MatLike
from ..device import Device
@ -19,14 +19,26 @@ class WindowsImpl(Touchable, Screenshotable):
# TODO: 硬编码路径
self.ahk = AHK(executable_path=str(resources.files('kaa.res.bin') / 'AutoHotkey.exe'))
self.device = device
self.emergency = False
# 设置 DPI aware否则高缩放显示器上返回的坐标会错误
windll.user32.SetProcessDPIAware()
def toggle_emergency():
self.emergency = True
self.ahk.msg_box('已启用紧急暂停模式')
self.ahk.add_hotkey('^F4', toggle_emergency)
# TODO: 这个应该移动到其他地方去
def _stop():
from kotonebot.backend.context.context import vars
vars.flow.request_interrupt()
self.ahk.msg_box('任务已停止。', title='琴音小助手', icon=MsgBoxIcon.EXCLAMATION)
def _toggle_pause():
from kotonebot.backend.context.context import vars
if vars.flow.is_paused:
self.ahk.msg_box('任务即将恢复。\n关闭此消息框后将会继续执行', title='琴音小助手', icon=MsgBoxIcon.EXCLAMATION)
vars.flow.request_resume()
else:
vars.flow.request_pause()
self.ahk.msg_box('任务已暂停。\n关闭此消息框后再按一次快捷键恢复执行。', title='琴音小助手', icon=MsgBoxIcon.EXCLAMATION)
self.ahk.add_hotkey('^F4', _toggle_pause) # Ctrl+F4 暂停/恢复
self.ahk.add_hotkey('^F3', _stop) # Ctrl+F3 停止
self.ahk.start_hotkeys()
# 将点击坐标设置为相对 Client
self.ahk.set_coord_mode('Mouse', 'Client')
@ -60,11 +72,6 @@ class WindowsImpl(Touchable, Screenshotable):
"""将 Client 区域坐标转换为屏幕坐标"""
return win32gui.ClientToScreen(hwnd, (x, y))
def __wait_not_emergency(self):
from time import sleep # TODO: 改为 kotonebot.backend.context.sleep
while self.emergency:
sleep(0.2)
def screenshot(self) -> MatLike:
if not self.ahk.win_is_active('gakumas'):
self.ahk.win_activate('gakumas')
@ -133,7 +140,6 @@ class WindowsImpl(Touchable, Screenshotable):
return 'portrait'
def click(self, x: int, y: int) -> None:
self.__wait_not_emergency()
# x, y = self.__client_to_screen(self.hwnd, x, y)
# (0, 0) 很可能会点到窗口边框上
if x == 0:
@ -146,7 +152,6 @@ class WindowsImpl(Touchable, Screenshotable):
self.ahk.click(x, y)
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float | None = None) -> None:
self.__wait_not_emergency()
if not self.ahk.win_is_active('gakumas'):
self.ahk.win_activate('gakumas')
x1, y1 = int(x1 / self.scale_ratio), int(y1 / self.scale_ratio)

View File

@ -15,6 +15,7 @@ from kotonebot.kaa.db import IdolCard
from kotonebot.config.manager import load_config, save_config
from kotonebot.config.base_config import UserConfig, BackendConfig
from kotonebot.backend.context import task_registry, ContextStackVars
from kotonebot.backend.context.context import vars
from kotonebot.client.host import Mumu12Host, LeidianHost
from kotonebot.kaa.common import (
BaseConfig, APShopItems, CapsuleToysConfig, ClubRewardConfig, PurchaseConfig, ActivityFundsConfig,
@ -182,6 +183,8 @@ class KotoneBotUI:
def __init__(self, kaa: Kaa) -> None:
self.is_running: bool = False
self.single_task_running: bool = False
self.is_stopping: bool = False # 新增:标记是否正在停止过程中
self.is_single_task_stopping: bool = False # 新增:标记单个任务是否正在停止
self._kaa = kaa
self._load_config()
self._setup_kaa()
@ -232,14 +235,20 @@ class KotoneBotUI:
return f"已导出到 {zip_filename}"
def get_button_status(self) -> str:
def get_button_status(self) -> Tuple[str, bool]:
"""获取按钮状态和交互性"""
if not hasattr(self, 'run_status'):
return "启动"
return "启动", True
if not self.run_status.running:
self.is_running = False
return "启动"
return "停止"
self.is_stopping = False # 重置停止状态
return "启动", True
if self.is_stopping:
return "停止中...", False # 停止中时禁用按钮
return "停止", True
def update_task_status(self) -> List[List[str]]:
status_list: List[List[str]] = []
@ -262,6 +271,11 @@ class KotoneBotUI:
def toggle_run(self) -> Tuple[str, List[List[str]]]:
if not self.is_running:
return self.start_run()
# 如果正在停止过程中,忽略重复点击
if self.is_stopping:
return "停止中...", self.update_task_status()
return self.stop_run()
def start_run(self) -> Tuple[str, List[List[str]]]:
@ -270,10 +284,19 @@ class KotoneBotUI:
return "停止", self.update_task_status()
def stop_run(self) -> Tuple[str, List[List[str]]]:
self.is_stopping = True # 设置停止状态
# 如果当前处于暂停状态,先恢复再停止
if vars.flow.is_paused:
gr.Info("检测到任务暂停,正在恢复后停止...")
vars.flow.request_resume()
self.is_running = False
if self._kaa:
self.run_status.interrupt()
return "启动", self.update_task_status()
gr.Info("正在停止任务...")
return "停止中...", self.update_task_status()
def start_single_task(self, task_name: str) -> Tuple[str, str]:
if not task_name:
@ -294,11 +317,48 @@ class KotoneBotUI:
return "停止任务", f"正在执行任务: {task_name}"
def stop_single_task(self) -> Tuple[str, str]:
self.is_single_task_stopping = True # 设置单个任务停止状态
# 如果当前处于暂停状态,先恢复再停止
if vars.flow.is_paused:
gr.Info("检测到任务暂停,正在恢复后停止...")
vars.flow.request_resume()
self.single_task_running = False
if hasattr(self, 'run_status') and self._kaa:
self.run_status.interrupt()
gr.Info("任务已停止")
return "执行任务", "任务已停止"
gr.Info("正在停止任务...")
return "停止中...", "正在停止任务..."
def toggle_pause(self) -> str:
"""切换暂停/恢复状态"""
if vars.flow.is_paused:
vars.flow.request_resume()
gr.Info("任务已恢复")
return "暂停"
else:
vars.flow.request_pause()
gr.Info("任务已暂停")
return "恢复"
def get_pause_button_status(self) -> str:
"""获取暂停按钮的状态"""
if vars.flow.is_paused:
return "恢复"
else:
return "暂停"
def get_pause_button_with_interactive(self) -> gr.Button:
"""获取暂停按钮的状态和交互性"""
try:
text = "恢复" if vars.flow.is_paused else "暂停"
except ValueError:
# ValueError: Forwarded object vars called before initialization.
# TODO: vars.flow.is_paused 应该要可以在脚本正式启动前就能访问
text = '未启动'
# 如果正在停止过程中,禁用暂停按钮
interactive = not (self.is_stopping or self.is_single_task_stopping)
return gr.Button(value=text, interactive=interactive)
def save_settings2(self, return_values: list[ConfigBuilderReturnValue], *args) -> str:
options = BaseConfig()
@ -328,7 +388,8 @@ class KotoneBotUI:
gr.Markdown("## 状态")
with gr.Row():
run_btn = gr.Button("启动", scale=1)
run_btn = gr.Button("启动", scale=2)
pause_btn = gr.Button("暂停", scale=1)
if self._kaa.upgrade_msg:
gr.Markdown('### 配置升级报告')
gr.Markdown(self._kaa.upgrade_msg)
@ -340,19 +401,39 @@ class KotoneBotUI:
label="任务状态"
)
def on_run_click(evt: gr.EventData) -> Tuple[str, List[List[str]]]:
return self.toggle_run()
def on_run_click(evt: gr.EventData) -> Tuple[gr.Button, List[List[str]]]:
result = self.toggle_run()
# 如果正在停止,禁用按钮
interactive = not self.is_stopping
button = gr.Button(value=result[0], interactive=interactive)
return button, result[1]
def on_pause_click(evt: gr.EventData) -> str:
return self.toggle_pause()
run_btn.click(
fn=on_run_click,
outputs=[run_btn, task_status]
)
pause_btn.click(
fn=on_pause_click,
outputs=[pause_btn]
)
# 添加定时器,分别更新按钮状态和任务状态
def update_run_button_status():
text, interactive = self.get_button_status()
return gr.Button(value=text, interactive=interactive)
gr.Timer(1.0).tick(
fn=self.get_button_status,
fn=update_run_button_status,
outputs=[run_btn]
)
gr.Timer(1.0).tick(
fn=self.get_pause_button_with_interactive,
outputs=[pause_btn]
)
gr.Timer(1.0).tick(
fn=self.update_task_status,
outputs=[task_status]
@ -372,21 +453,34 @@ class KotoneBotUI:
value=None
)
# 创建执行按钮
execute_btn = gr.Button("执行任务")
# 创建执行按钮和暂停按钮
with gr.Row():
execute_btn = gr.Button("执行任务", scale=2)
pause_btn = gr.Button("暂停", scale=1)
task_result = gr.Markdown("")
def toggle_single_task(task_name: str) -> Tuple[str, str]:
def toggle_single_task(task_name: str) -> Tuple[gr.Button, str]:
if self.single_task_running:
return self.stop_single_task()
else:
return self.start_single_task(task_name)
# 如果正在停止过程中,忽略重复点击
if self.is_single_task_stopping:
return gr.Button(value="停止中...", interactive=False), "正在停止任务..."
def get_task_button_status() -> str:
result = self.stop_single_task()
return gr.Button(value=result[0], interactive=False), result[1]
else:
result = self.start_single_task(task_name)
return gr.Button(value=result[0], interactive=True), result[1]
def get_task_button_status() -> gr.Button:
if not hasattr(self, 'run_status') or not self.run_status.running:
self.single_task_running = False
return "执行任务"
return "停止任务"
self.is_single_task_stopping = False # 重置停止状态
return gr.Button(value="执行任务", interactive=True)
if self.is_single_task_stopping:
return gr.Button(value="停止中...", interactive=False) # 停止中时禁用按钮
return gr.Button(value="停止任务", interactive=True)
def get_single_task_status() -> str:
if not hasattr(self, 'run_status'):
@ -419,17 +513,29 @@ class KotoneBotUI:
return ""
def on_pause_click(evt: gr.EventData) -> str:
return self.toggle_pause()
execute_btn.click(
fn=toggle_single_task,
inputs=[task_dropdown],
outputs=[execute_btn, task_result]
)
pause_btn.click(
fn=on_pause_click,
outputs=[pause_btn]
)
# 添加定时器更新按钮状态和任务状态
gr.Timer(1.0).tick(
fn=get_task_button_status,
outputs=[execute_btn]
)
gr.Timer(1.0).tick(
fn=self.get_pause_button_with_interactive,
outputs=[pause_btn]
)
gr.Timer(1.0).tick(
fn=get_single_task_status,
outputs=[task_result]

View File

@ -12,7 +12,7 @@ from util import BaseTestCase
class TestContextInterruptible(BaseTestCase):
def setUp(self):
# 每个测试前重置中断状态
vars.interrupted.clear()
vars.flow.clear_interrupt()
def test_interruptible(self):
# 测试正常函数调用
@ -23,7 +23,7 @@ class TestContextInterruptible(BaseTestCase):
self.assertEqual(test_func(), "success")
# 测试中断情况
vars.interrupted.set()
vars.flow.request_interrupt()
with self.assertRaises(KeyboardInterrupt):
test_func()
@ -37,7 +37,7 @@ class TestContextInterruptible(BaseTestCase):
self.assertEqual(test_func_with_args(1, y=3), 4)
# 测试中断情况
vars.interrupted.set()
vars.flow.request_interrupt()
with self.assertRaises(KeyboardInterrupt):
test_func_with_args(1)
@ -58,7 +58,7 @@ class TestContextInterruptible(BaseTestCase):
self.assertEqual(obj.method2("test"), "method2: test")
# 测试中断情况
vars.interrupted.set()
vars.flow.request_interrupt()
with self.assertRaises(KeyboardInterrupt):
obj.method1()
with self.assertRaises(KeyboardInterrupt):
@ -88,7 +88,7 @@ class TestContextInterruptible(BaseTestCase):
# 等待一小段时间后设置中断标志
time.sleep(0.5)
vars.interrupted.set()
vars.flow.request_interrupt()
# 等待所有线程完成
for t in threads:
@ -98,13 +98,13 @@ class TestContextInterruptible(BaseTestCase):
self.assertTrue(len(exceptions) == 3)
def test_sleep(self):
vars.interrupted.set()
vars.flow.request_interrupt()
with self.assertRaises(KeyboardInterrupt):
sleep(2)
def test_context_ocr_interruptible(self):
# 测试 ContextOcr 的可中断性
vars.interrupted.set()
vars.flow.request_interrupt()
with self.assertRaises(KeyboardInterrupt):
ocr.ocr()
with self.assertRaises(KeyboardInterrupt):
@ -116,7 +116,7 @@ class TestContextInterruptible(BaseTestCase):
def test_context_image_interruptible(self):
# 测试 ContextImage 的可中断性
vars.interrupted.set()
vars.flow.request_interrupt()
with self.assertRaises(KeyboardInterrupt):
image.find("test.png")
with self.assertRaises(KeyboardInterrupt):
@ -132,7 +132,7 @@ class TestContextInterruptible(BaseTestCase):
def test_context_color_interruptible(self):
# 测试 ContextColor 的可中断性
vars.interrupted.set()
vars.flow.request_interrupt()
with self.assertRaises(KeyboardInterrupt):
color.find((255, 255, 255))