feat(task): 优化了部分日常流程

1. SimpleDispatcher 增加 timeout 功能
2. 修复 ContextStackVars 中 screenshot 属性无法自动继承
3. 优化收取活动费、工作、竞赛、购买在 uiautomator2 下的表现
This commit is contained in:
XcantloadX 2025-02-14 10:37:43 +08:00
parent 4852f3a318
commit 95f4111647
8 changed files with 90 additions and 52 deletions

View File

@ -199,7 +199,15 @@ class ContextStackVars:
@property
def screenshot(self) -> MatLike:
match self.screenshot_mode:
case 'manual' | 'manual-inherit':
case 'manual':
if self._screenshot is None:
raise ValueError("No screenshot data found.")
return self._screenshot
case 'manual-inherit':
# TODO: 这一部分要考虑和 device.screenshot() 合并
if self._inherit_screenshot is not None:
self._screenshot = self._inherit_screenshot
self._inherit_screenshot = None
if self._screenshot is None:
raise ValueError("No screenshot data found.")
return self._screenshot

View File

@ -247,15 +247,17 @@ class SimpleDispatcher:
self.finished: bool = False
self.result: Any | None = None
self.interval = interval
self.timeout_value: float | None = None
self.timeout_critical: bool = False
self.__last_run_time: float = 0
def click(
self,
target: Image | StringMatchFunction | Literal['center'] | Rect,
*,
finish: bool = False,
log: str | None = None
):
self,
target: Image | StringMatchFunction | Literal['center'] | Rect,
*,
finish: bool = False,
log: str | None = None
):
params = ClickParams(finish=finish, log=log)
if isinstance(target, Image):
self.blocks.append(ClickImage(self, target, params=params))
@ -281,18 +283,24 @@ class SimpleDispatcher:
return self
def until(
self,
text: StringMatchFunction | Image,
*,
rect: Rect | None = None,
result: Any | None = None
):
self,
text: StringMatchFunction | Image,
*,
rect: Rect | None = None,
result: Any | None = None
):
if isinstance(text, Image):
self.blocks.append(UntilImage(self, text, rect=rect, result=result))
else:
self.blocks.append(UntilText(self, text, rect=rect, result=result))
return self
def timeout(self, timeout: float, *, critical: bool = False, result: Any | None = None):
self.timeout_value = timeout
self.timeout_critical = critical
self.timeout_result = result
return self
def run(self):
from kotonebot import device, sleep
while True:
@ -305,6 +313,13 @@ class SimpleDispatcher:
if self.finished:
break
self.__last_run_time = time.time()
if self.timeout_value and time.time() - self.__last_run_time > self.timeout_value:
if self.timeout_critical:
raise TimeoutError(f'Dispatcher "{self.name}" timed out.')
else:
self.logger.warning(f'Dispatcher "{self.name}" timed out.')
self.result = self.timeout_result
break
device.screenshot()
return self.result

View File

@ -336,7 +336,9 @@ class Device:
:return: 前台 APP 的包名如果获取失败则返回 None
:exception: 如果设备不支持此功能则抛出 NotImplementedError
"""
return self._command.current_package()
ret = self._command.current_package()
logger.debug("current_package: %s", ret)
return ret
def detect_orientation(self) -> Literal['portrait', 'landscape'] | None:
"""

View File

@ -24,6 +24,9 @@ def acquire_activity_funds():
logger.info('Activity funds claimed.')
else:
logger.info('No activity funds to claim.')
while not at_home():
pass
if __name__ == '__main__':
import logging

View File

@ -172,6 +172,8 @@ def assignment():
assign('mini')
else:
logger.info('MiniLive reassign is disabled.')
while not at_assignment():
pass
if conf().assignment.online_live_reassign_enabled:
if image.find(R.Daily.IconAssignOnlineLive):
assign('online')

View File

@ -2,11 +2,13 @@
import logging
from gettext import gettext as _
from kotonebot.backend.dispatch import SimpleDispatcher
from . import R
from .common import conf
from .actions.scenes import at_home, goto_home
from .actions.loading import wait_loading_end
from kotonebot import device, image, ocr, color, action, task, user, rect_expand, sleep
from kotonebot import device, image, ocr, color, action, task, user, rect_expand, sleep, contains
logger = logging.getLogger(__name__)
@ -18,14 +20,12 @@ def goto_contest() -> bool:
:return: 是否存在未完成的挑战
"""
# TODO: 优化这一部分,加入区域检测,提高速度
device.click(image.expect(R.Common.ButtonContest))
sleep(0.5)
btn_contest = image.expect_wait(R.Daily.TextContest, colored=True, transparent=True, threshold=0.9999)
sleep(0.2)
ocr.expect_wait(contains('CONTEST'))
btn_contest = ocr.expect_wait(contains('コンテスト'))
has_ongoing_contest = image.find(R.Daily.TextContestLastOngoing) is not None
device.click(btn_contest)
sleep(0.5)
wait_loading_end()
if not has_ongoing_contest:
while not image.find(R.Daily.ButtonContestRanking):
# [screenshots/contest/acquire1.png]

View File

@ -7,7 +7,7 @@ from . import R
from .common import conf, DailyMoneyShopItems
from kotonebot.backend.util import cropped
from kotonebot import task, device, image, ocr, action, sleep
from kotonebot.backend.dispatch import DispatcherContext, dispatcher
from kotonebot.backend.dispatch import DispatcherContext, SimpleDispatcher, dispatcher
from .actions.scenes import goto_home, goto_shop, at_daily_shop
logger = logging.getLogger(__name__)
@ -38,10 +38,10 @@ def money_items2(items: Optional[list[DailyMoneyShopItems]] = None):
scroll = 0
while items:
for item in items:
if image.find(item.to_resource()):
if image.find(item.to_resource(), colored=True):
logger.info(f'Purchasing {item.to_ui_text(item)}...')
device.click()
dispatch_purchase_dialog()
handle_purchase_dialog()
finished.append(item)
items = [item for item in items if item not in finished]
# 全都买完了
@ -57,10 +57,10 @@ def money_items2(items: Optional[list[DailyMoneyShopItems]] = None):
break
logger.info(f'Purchasing money items completed. {len(finished)} item(s) purchased.')
if items:
logger.info(f'{len(items)} item(s) not purchased: {", ".join([item.to_ui_text(item) for item in items])}')
logger.info(f'{len(items)} item(s) not purchased/already purchased: {", ".join([item.to_ui_text(item) for item in items])}')
@action('购买推荐商品', dispatcher=True)
def dispatch_recommended_items(ctx: DispatcherContext):
@action('购买推荐商品', screenshot_mode='manual-inherit')
def dispatch_recommended_items():
"""
购买推荐商品
@ -68,20 +68,19 @@ def dispatch_recommended_items(ctx: DispatcherContext):
结束状态-
"""
# 前置条件:[screenshots\shop\money1.png]
if ctx.beginning:
logger.info(f'Start purchasing recommended items.')
if image.find(R.Daily.TextShopRecommended):
logger.info(f'Clicking on recommended item.') # TODO: 计数
device.click()
elif ctx.expand(dispatch_purchase_dialog):
pass
elif image.find(R.Daily.IconTitleDailyShop) and not image.find(R.Daily.TextShopRecommended):
logger.info(f'No recommended item found. Finished.')
ctx.finish()
logger.info(f'Start purchasing recommended items.')
@action('确认购买', dispatcher=True)
def dispatch_purchase_dialog(ctx: DispatcherContext):
while True:
if image.find(R.Daily.TextShopRecommended):
logger.info(f'Clicking on recommended item.') # TODO: 计数
device.click()
handle_purchase_dialog()
elif image.find(R.Daily.IconTitleDailyShop) and not image.find(R.Daily.TextShopRecommended):
logger.info(f'No recommended item found. Finished.')
break
@action('确认购买', screenshot_mode='manual-inherit')
def handle_purchase_dialog():
"""
确认购买
@ -89,18 +88,28 @@ def dispatch_purchase_dialog(ctx: DispatcherContext):
结束状态对话框关闭后原来的界面
"""
# 前置条件:[screenshots\shop\dialog.png]
device.screenshot()
if image.find(R.Daily.ButtonShopCountAdd, colored=True):
logger.debug('Adjusting quantity(+1)...')
device.click()
elif image.find(R.Common.ButtonConfirm):
sleep(0.1)
logger.debug('Confirming purchase...')
device.click()
ctx.finish()
elif image.find(R.Daily.TextShopPurchased):
# TODO: 需要有个更好的方式检测是否已购买
purchased = (SimpleDispatcher('dispatch_purchase_dialog')
.until(R.Common.ButtonConfirm, result=False)
.until(R.Daily.TextShopPurchased, result=True)
.timeout(timeout=3, result=True)
).run()
if purchased:
logger.info('Item sold out.')
ctx.finish()
sleep(1) # 等待售罄提示消失
return
else:
device.screenshot()
while image.find(R.Daily.ButtonShopCountAdd, colored=True):
logger.debug('Adjusting quantity(+1)...')
device.click()
sleep(0.2)
device.screenshot()
logger.debug('Confirming purchase...')
device.click(image.expect_wait(R.Common.ButtonConfirm))
# 等待对话框动画结束
image.expect_wait(R.Daily.IconTitleDailyShop)
@action('购买 AP 物品')
def ap_items():

View File

@ -182,6 +182,7 @@ class KotoneBotUI:
try:
save_config(self.config, "config.json")
gr.update(visible=True)
return "设置已保存!"
except Exception as e:
return f"保存设置失败:{str(e)}"
@ -528,9 +529,7 @@ class KotoneBotUI:
self.current_config = self.config.user_configs[0]
def create_ui(self) -> gr.Blocks:
# 每次创建 UI 时重新加载配置
self._load_config()
with gr.Blocks(title="KotoneBot Assistant", css="#container { max-width: 800px; margin: auto; padding: 20px; }") as app:
with gr.Column(elem_id="container"):
gr.Markdown("# KotoneBot Assistant")