refactor(task): 重构竞赛

Fix #33 #13
This commit is contained in:
XcantloadX 2025-05-23 12:07:55 +08:00
parent ac3be6a95f
commit dbe9caec23
3 changed files with 127 additions and 92 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 979 KiB

View File

@ -0,0 +1 @@
{"definitions":{"4503db6b-7224-4b81-9971-e7cfa56e10f2":{"name":"Daily.TextRoadToIdol","displayName":"文字「アイドルへの道」","type":"template","annotationId":"4503db6b-7224-4b81-9971-e7cfa56e10f2","useHintRect":false},"060da12a-2ed2-488d-8d3a-bbe5d171b5d2":{"name":"Daily.PointContest","displayName":"进入竞赛的按钮","type":"hint-point","annotationId":"060da12a-2ed2-488d-8d3a-bbe5d171b5d2","useHintRect":false}},"annotations":[{"id":"4503db6b-7224-4b81-9971-e7cfa56e10f2","type":"rect","data":{"x1":84,"y1":894,"x2":204,"y2":959}},{"id":"060da12a-2ed2-488d-8d3a-bbe5d171b5d2","type":"point","data":{"x":492,"y":878}}]}

View File

@ -7,11 +7,11 @@ from kotonebot.kaa.common import conf
from kotonebot.kaa.game_ui import WhiteFilter
from ..actions.scenes import at_home, goto_home
from ..actions.loading import wait_loading_end
from kotonebot import device, image, ocr, color, action, task, user, rect_expand, sleep, contains
from kotonebot import device, image, ocr, color, action, task, user, rect_expand, sleep, contains, Interval
logger = logging.getLogger(__name__)
@action('前往竞赛页面')
@action('前往竞赛页面', screenshot_mode='manual-inherit')
def goto_contest() -> bool:
"""
前置条件位于首页 \n
@ -19,25 +19,96 @@ def goto_contest() -> bool:
:return: 是否存在未完成的挑战
"""
# TODO: 优化这一部分,加入区域检测,提高速度
device.click(image.expect(R.Common.ButtonContest))
ocr.expect_wait(contains('CONTEST'))
btn_contest = ocr.expect_wait(contains('コンテスト'))
has_ongoing_contest = image.find(R.Daily.TextContestLastOngoing) is not None
device.click(btn_contest)
if not has_ongoing_contest:
while not image.find(R.Daily.ButtonContestRanking):
it = Interval()
has_ongoing_contest = None
while True:
device.screenshot()
it.wait()
if image.find(R.Common.ButtonContest):
device.click()
elif image.find(R.Daily.TextRoadToIdol):
# 已进入竞赛 Tab
if image.find(R.Daily.TextContestLastOngoing):
logger.info('Ongoing contest found.')
has_ongoing_contest = True
else:
has_ongoing_contest = False
# 点击进入竞赛页面
logger.debug('Clicked on Contest.')
device.click(R.Daily.PointContest)
continue
# 有未完成的挑战
if has_ongoing_contest is True:
if image.find(R.Daily.ButtonContestChallengeStart):
logger.info('Challenging.')
break
# 新开挑战
elif has_ongoing_contest is False:
if image.find(R.Daily.ButtonContestRanking):
logger.info('Now at pick contestant screen.')
break
# 跳过奖励领取
# [kotonebot-resource\sprites\jp\daily\screenshot_contest_season_reward.png]
# [screenshots/contest/acquire2.png]
device.click(R.Daily.PointDissmissContestReward)
sleep(1)
# [screenshots/contest/main.png]
else:
image.expect_wait(R.Daily.ButtonContestChallengeStart)
return has_ongoing_contest
@action('选择并挑战')
def pick_and_contest(has_ongoing_contest: bool = False) -> bool:
@action('处理竞赛挑战', screenshot_mode='manual-inherit')
def handle_challenge() -> bool:
"""
前置条件- \n
结束状态位于竞赛选择对手界面
:return: 是否命中任何处理
"""
# 挑战开始 [screenshots/contest/start1.png]
if image.find(R.Daily.ButtonContestStart, threshold=0.75): # TODO: 为什么默认阈值找不到?
logger.debug('Clicking on start button.')
device.click()
# 记忆未编成 [screenshots/contest/no_memo.png]
if image.find(R.Daily.TextContestNoMemory):
logger.debug('Memory not set. Using auto-compilation.')
user.warning('竞赛未编成', _('记忆未编成。将使用自动编成。'), once=True)
if image.find(R.Daily.ButtonContestChallenge):
device.click()
return True
# 勾选跳过所有
# [screenshots/contest/contest2.png]
if image.find(R.Common.CheckboxUnchecked, colored=True):
logger.debug('Checking skip all.')
device.click()
return True
# 跳过所有
# [screenshots/contest/contest1.png]
if image.find(R.Daily.ButtonIconSkip, preprocessors=[WhiteFilter()]):
logger.debug('Skipping all.')
device.click()
return True
if image.find(R.Common.ButtonNextNoIcon):
logger.debug('Clicking on next.')
device.click()
# 終了 [screenshots/contest/after_contest3.png]
if image.find(R.Common.ButtonEnd):
logger.debug('Clicking on end.')
device.click()
return True
# 可能出现的奖励弹窗 [screenshots/contest/after_contest4.png]
if image.find(R.Common.ButtonClose):
logger.debug('Clicking on close.')
device.click()
return False
@action('选择对手')
def handle_pick_contestant(has_ongoing_contest: bool = False) -> tuple[bool, bool]:
"""
选择并挑战
@ -45,77 +116,37 @@ def pick_and_contest(has_ongoing_contest: bool = False) -> bool:
结束状态位于竞赛界面
:param has_ongoing_contest: 是否有中断未完成的挑战
:return: 如果返回假说明今天挑战次数已经用完了
:return: 二元组第一个值表示是否命中任何处理
第二个值表示是否应该继续挑战 False 表示今天挑战次数已经用完了
"""
# 判断是否有中断未完成的挑战
# [screenshots/contest/ongoing.png]
if not has_ongoing_contest:
image.expect_wait(R.Daily.ButtonContestRanking)
sleep(3) # 等待动画
# 随机选一个对手 [screenshots/contest/main.png]
logger.debug('Clicking on contestant.')
contestant_list = image.find_all(R.Daily.TextContestOverallStats)
if contestant_list is None or len(contestant_list) == 0:
logger.info('No contestant found. Today\'s challenge points used up.')
return False
# 按照y坐标从上到下排序对手列表
contestant_list.sort(key=lambda x: x.position[1])
if len(contestant_list) != 3:
logger.warning('Cannot find all 3 contestants.')
# 选择配置文件中对应的对手顺序1最强3最弱
target = conf().contest.select_which_contestant
if target >= 1 and target <= 3 and target <= len(contestant_list):
target -= 1 # [1, 3]映射至[0, 2]
else:
target = 0 # 出错则默认选择第一个
contestant = contestant_list[target]
logger.info('Picking up contestant #%d.', target + 1)
device.click(contestant)
# 挑战开始 [screenshots/contest/start1.png]
logger.debug('Clicking on start button.')
device.click(image.expect_wait(R.Daily.ButtonContestStart))
sleep(3) # 多延迟一点
# 进入挑战页面 [screenshots/contest/contest1.png]
# [screenshots/contest/contest2.png]
while not image.find(R.Daily.ButtonContestChallengeStart):
# 记忆未编成 [screenshots/contest/no_memo.png]
if image.find(R.Daily.TextContestNoMemory):
logger.debug('Memory not set. Using auto-compilation.')
user.warning('竞赛未编成', _('记忆未编成。将使用自动编成。'), once=True)
device.click(image.expect(R.Daily.ButtonContestChallenge))
logger.debug('Waiting for challenge start screen.')
# 勾选跳过所有
if image.find(R.Common.CheckboxUnchecked):
logger.debug('Checking skip all.')
device.click()
sleep(0.5)
# 点击 SKIP
logger.debug('Clicking on SKIP.')
device.click(image.expect_wait(R.Daily.ButtonIconSkip, timeout=10, preprocessors=[WhiteFilter()]))
while not image.wait_for(R.Common.ButtonNextNoIcon, timeout=2):
device.click_center()
logger.debug('Waiting for the result.')
# [screenshots/contest/after_contest1.png]
# 点击 次へ [screenshots/contest/after_contest2.png]
logger.debug('Challenge finished. Clicking on next.')
device.click()
# 点击 終了 [screenshots/contest/after_contest3.png]
logger.debug('Clicking on end.')
device.click(image.expect_wait(R.Common.ButtonEnd))
# 可能出现的奖励弹窗 [screenshots/contest/after_contest4.png]
sleep(1)
if image.find(R.Common.ButtonClose):
logger.debug('Clicking on close.')
device.click()
# 等待返回竞赛界面
wait_loading_end()
image.expect_wait(R.Daily.ButtonContestRanking)
logger.info('Challenge finished.')
return True
if image.find(R.Daily.ButtonContestRanking):
# 无进行中挑战,说明要选择对手
if not has_ongoing_contest:
# 随机选一个对手 [screenshots/contest/main.png]
logger.debug('Clicking on contestant.')
contestant_list = image.find_all(R.Daily.TextContestOverallStats)
if contestant_list is None or len(contestant_list) == 0:
logger.info('No contestant found. Today\'s challenge points used up.')
return True, False
# 按照y坐标从上到下排序对手列表
contestant_list.sort(key=lambda x: x.position[1])
if len(contestant_list) != 3:
logger.warning('Cannot find all 3 contestants.')
# 选择配置文件中对应的对手顺序1最强3最弱
target = conf().contest.select_which_contestant
if target >= 1 and target <= 3 and target <= len(contestant_list):
target -= 1 # [1, 3]映射至[0, 2]
else:
target = 0 # 出错则默认选择第一个
contestant = contestant_list[target]
logger.info('Picking up contestant #%d.', target + 1)
device.click(contestant)
sleep(2)
return True, True
return False, True
@task('竞赛')
def contest():
""""""
if not conf().contest.enabled:
logger.info('Contest is disabled.')
return
@ -129,17 +160,20 @@ def contest():
logger.info('No action needed.')
return
has_ongoing_contest = goto_contest()
while pick_and_contest(has_ongoing_contest):
sleep(1.3)
while True:
device.screenshot()
handled, should_continue = handle_pick_contestant(has_ongoing_contest)
if not should_continue:
break
if not handled:
handled = handle_challenge()
if not handled:
device.click(10, 10)
has_ongoing_contest = False
goto_home()
logger.info('Contest all finished.')
if __name__ == '__main__':
import logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')
logging.getLogger('kotonebot').setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
contest()
from kotonebot.kaa.main import Kaa
from kotonebot.backend.context import tasks_from_id
Kaa('config.json').run(tasks_from_id(['contest']))