feat(task): 优化培育推荐卡检测速度 & 优化部分日常流程

1. 将默认截图控制方式从 adb 修改为 uiautomator2
2. uiautomator2 加入最低截图间隔 0.2s
3. 移除了 923e3b8 中对推荐卡检测逻辑中内层循环展开的优化,因为那部分实际上是负优化。详见 practice() 函数注释。
4. 优化了活動支給和工作在高截图速度下的执行
This commit is contained in:
XcantloadX 2025-02-13 14:21:03 +08:00
parent 19e6fa5167
commit 79f3c222d4
12 changed files with 225 additions and 97 deletions

View File

@ -683,7 +683,7 @@ class Context(Generic[T]):
ip = self.config.current.backend.adb_ip
port = self.config.current.backend.adb_port
# TODO: 处理链接失败情况
self.__device = ContextDevice(create_device(f'{ip}:{port}', 'adb'))
self.__device = ContextDevice(create_device(f'{ip}:{port}', 'uiautomator2'))
def inject(
self,

View File

@ -1,3 +1,4 @@
import time
from typing import Literal
from ..device import Device
@ -7,16 +8,24 @@ import numpy as np
import uiautomator2 as u2
from cv2.typing import MatLike
SCREENSHOT_INTERVAL = 0.2
class UiAutomator2Impl(Screenshotable, Commandable, Touchable):
def __init__(self, device: Device):
self.device = device
self.u2_client = u2.Device(device.adb.serial)
self.__last_screenshot_time = 0
def screenshot(self) -> MatLike:
"""
截图
"""
from kotonebot import sleep
delta = time.time() - self.__last_screenshot_time
if delta < SCREENSHOT_INTERVAL:
sleep(SCREENSHOT_INTERVAL - delta)
image = self.u2_client.screenshot(format='opencv')
self.__last_screenshot_time = time.time()
assert isinstance(image, np.ndarray)
return image

View File

@ -192,11 +192,16 @@ def start(
if __name__ == '__main__':
from kotonebot.tasks.common import BaseConfig
from kotonebot.backend.util import Profiler
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(filename)s:%(lineno)d] - %(message)s')
logger.setLevel(logging.DEBUG)
logging.getLogger('kotonebot').setLevel(logging.DEBUG)
init_context(config_type=BaseConfig)
initialize('kotonebot.tasks')
run(debug=True)
pf = Profiler('profiler')
pf.begin()
run()
pf.end()
pf.snakeviz()

View File

@ -93,8 +93,8 @@ def acquisitions() -> AcquisitionType | None:
logger.info("Acquisition stuffs...")
# 加载画面
logger.debug("Check loading screen...")
if loading():
logger.info("Loading...")
return "Loading"
# P饮料领取
@ -229,7 +229,7 @@ def commut_event():
for button in buttons:
# 冲刺课程,跳过处理
if '重点' in button.title:
break
return False
logger.info(f"Found commu event: {button.title}")
logger.info("Select first choice")
if buttons[0].selected:

View File

@ -1,5 +1,8 @@
import math
import time
import logging
from typing_extensions import deprecated
from itertools import cycle
from typing import Generic, Iterable, Literal, NamedTuple, Callable, Generator, TypeVar, ParamSpec, cast
import cv2
@ -361,7 +364,7 @@ def until_action_scene(week_first: bool = False):
logger.info("Action scene not detected. Retry...")
if acquisitions():
continue
if commut_event():
if week_first and commut_event():
continue
sleep(0.2)
else:
@ -397,29 +400,60 @@ def practice():
# and len(list(filter(lambda x: x >= 0.01, border_scores))) >= 3
)
# 循环打出推荐卡
# 循环打出推荐卡,直到练习结束
wait = cycle([0.1, 0.3, 0.5]) # 见下方解释
tries = 1
while True:
start_time = time.time()
img = device.screenshot()
if image.find(R.Common.ButtonIconCheckMark):
logger.info("Confirmation dialog detected")
device.click()
sleep(3) # 等待卡片刷新
sleep(4) # 等待卡片刷新
continue
card_count = skill_card_count(img)
# cards = obtain_cards(img)
# card_count = len(cards)
# available_cards = [card for card in cards if card.available]
# if len(available_cards) == 1:
# device.double_click(available_cards[0].rect)
# sleep(3) # 等待卡片刷新
# continue
if card_count > 0 and handle_recommended_card(
card_count=card_count,
threshold_predicate=threshold_predicate,
img=img
) is not None:
sleep(3)
# TODO: 需要处理卡片数量为 0 的且当前回合未结束的情况
if card_count > 0:
inner_tries = 0
# 为什么要内层循环:
# 在 adb 截图模式下,如果把 skill_card_count 和 handle_recommended_card
# 放在同一循环中执行,循环一次大约需要 1.4~1.6s
# 游戏中推荐卡的发光周期约为 1s也就是循环一次大约是 1.5 个发光周期。
#
# 假设发光强度符合规律 y=|sin t|t=时间),
# 如果第一次循环恰巧处于发光强度较弱时,
# 那么经过 1.5T 后,依旧是处于发光强度较弱时,
# 这样会导致每次循环都正好是在发光强度较弱时检测,
# 每次都无法达到检测阈值。
#
# 因此:
# 1. 每次循环都等待随机时间
# 2. 将 handle_recommended_card 单独放到内循环中执行,
# 提高检测速度
while True:
result = handle_recommended_card(
card_count=card_count,
threshold_predicate=threshold_predicate,
img=img
)
if result:
break
img = device.screenshot()
inner_tries += 1
if inner_tries >= 5:
break
if result:
logger.info("Handle recommended card success with %d tries", tries)
sleep(4.5)
tries = 0
else:
tries += 1
end_time = time.time()
logger.debug("Handle recommended card time: %.2f", end_time - start_time)
delay = next(wait)
logger.info("Tries: %d, Delay: %.2f", tries, delay)
sleep(delay)
elif (
card_count == 0
and not image.find_multi([
@ -428,7 +462,6 @@ def practice():
])
):
break
sleep(np.random.uniform(0.01, 0.2))
# 结束动画
logger.info("CLEAR/PERFECT not found. Practice finished.")
@ -460,8 +493,9 @@ def exam(type: Literal['mid', 'final']):
else:
return result.score >= 0.10
# 关于上面阈值的解释:
# 两个阈值均指卡片周围的“黄色度”,
# total_threshold 指卡片平均的黄色度阈值border_thresholds 指卡片四边的黄色度阈值
# 所有阈值均指卡片周围的“黄色度”,
# score 指卡片四边的平均黄色度阈值,
# left_score、right_score、top_score、bottom_score 指卡片每边的黄色度阈值
# 为什么期中和期末考试阈值不一样:
# 期末考试的场景为黄昏,背景中含有大量黄色,
@ -470,37 +504,53 @@ def exam(type: Literal['mid', 'final']):
# 这样可以筛选出只有四边都包含黄色的发光卡片,
# 而由夕阳背景造成的假发光卡片通常不会四边都包含黄色。
wait = cycle([0.1, 0.3, 0.5])
tries = 1
while True:
start_time = time.time()
img = device.screenshot()
if image.find(R.Common.ButtonIconCheckMark):
logger.info("Confirmation dialog detected")
device.click()
sleep(3) # 等待卡片刷新
sleep(4) # 等待卡片刷新
continue
card_count = skill_card_count(img)
# cards = obtain_cards(img)
# card_count = len(cards)
# available_cards = [card for card in cards if card.available]
# if len(available_cards) == 1:
# device.double_click(available_cards[0].rect)
# sleep(3) # 等待卡片刷新
# continue
if card_count > 0 and handle_recommended_card(
card_count=card_count,
threshold_predicate=threshold_predicate,
img=img
) is not None:
sleep(3) # 等待卡片刷新
if card_count > 0:
inner_tries = 0
while True:
result = handle_recommended_card(
card_count=card_count,
threshold_predicate=threshold_predicate,
img=img
)
if result:
break
img = device.screenshot()
inner_tries += 1
if inner_tries >= 5:
break
if result:
logger.info("Handle recommended card success with %d tries", tries)
sleep(4.5)
tries = 0
else:
tries += 1
end_time = time.time()
logger.debug("Handle recommended card time: %.2f", end_time - start_time)
delay = next(wait)
logger.info("Tries: %d, Delay: %.2f", tries, delay)
sleep(delay)
# 考试结束
elif (
card_count == 0
and not ocr.find(contains('残りターン'), rect=R.InPurodyuusu.BoxExamTop)
and image.find(R.Common.ButtonNext)
):
# 点击“次へ”
device.click()
break
sleep(np.random.uniform(0.01, 0.1))
# 点击“次へ”
device.click(image.expect_wait(R.Common.ButtonNext))
if type == 'final':
while ocr.wait_for(contains("メモリー"), timeout=7):
device.click_center()
@ -570,23 +620,41 @@ def produce_end():
sleep(1)
# 结算完毕
# logger.info("Finalize")
# # [screenshots/produce_end/end_next_1.jpg]
# logger.debug("Click next 1")
# device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
# sleep(1.3)
# # [screenshots/produce_end/end_next_2.png]
# logger.debug("Click next 2")
# device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
# sleep(1.3)
# # [screenshots/produce_end/end_next_3.png]
# logger.debug("Click next 3")
# device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
# sleep(1.3)
# # [screenshots/produce_end/end_complete.png]
# logger.debug("Click complete")
# device.click(image.expect_wait(R.InPurodyuusu.ButtonComplete))
# sleep(1.3)
# 四个完成画面
logger.info("Finalize")
# [screenshots/produce_end/end_next_1.jpg]
logger.debug("Click next 1")
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
sleep(1.3)
# [screenshots/produce_end/end_next_2.png]
logger.debug("Click next 2")
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
sleep(1.3)
# [screenshots/produce_end/end_next_3.png]
logger.debug("Click next 3")
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
sleep(1.3)
# [screenshots/produce_end/end_complete.png]
logger.debug("Click complete")
device.click(image.expect_wait(R.InPurodyuusu.ButtonComplete))
sleep(1.3)
while True:
# [screenshots/produce_end/end_next_1.jpg]
# [screenshots/produce_end/end_next_2.png]
# [screenshots/produce_end/end_next_3.png]
if image.find(R.InPurodyuusu.ButtonNextNoIcon):
logger.debug("Click next")
device.click()
sleep(0.2)
# [screenshots/produce_end/end_complete.png]
elif image.find(R.InPurodyuusu.ButtonComplete):
logger.debug("Click complete")
device.click(image.expect_wait(R.InPurodyuusu.ButtonComplete))
sleep(0.2)
break
# 点击结束后可能还会弹出来:
# 活动进度、关注提示
# [screenshots/produce_end/end_activity.png]
@ -612,8 +680,8 @@ def produce_end():
sleep(1)
logger.info("Produce completed.")
def week_normal():
until_action_scene()
def week_normal(week_first: bool = False):
until_action_scene(week_first)
executed_action = handle_recommended_action()
logger.info("Executed recommended action: %s", executed_action)
# 推荐练习
@ -636,6 +704,7 @@ def week_normal():
until_action_scene()
def week_final_lesson():
until_action_scene()
if handle_recommended_action(final_week=True) != 'lesson':
raise ValueError("Failed to enter recommended action on final week.")
sleep(5)
@ -674,7 +743,7 @@ def hajime_regular(week: int = -1, start_from: int = 1):
:param start_from: 从第几周开始从1开始
"""
weeks = [
week_normal, # 1: Vo.レッスン、Da.レッスン、Vi.レッスン
lambda: week_normal(True), # 1: Vo.レッスン、Da.レッスン、Vi.レッスン
week_normal, # 2: 授業
week_normal, # 3: Vo.レッスン、Da.レッスン、Vi.レッスン、授業
week_normal, # 4: おでかけ、相談、活動支給
@ -707,7 +776,7 @@ def hajime_pro(week: int = -1, start_from: int = 1):
:param start_from: 从第几周开始从1开始
"""
weeks = [
week_normal, # 1
lambda: week_normal(True), # 1
week_normal, # 2
week_normal, # 3
week_normal, # 4
@ -842,6 +911,28 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s')
getLogger('kotonebot').setLevel(logging.DEBUG)
getLogger(__name__).setLevel(logging.DEBUG)
import os
from datetime import datetime
os.makedirs('logs', exist_ok=True)
log_filename = datetime.now().strftime('logs/task-%y-%m-%d-%H-%M-%S.log')
file_handler = logging.FileHandler(log_filename, encoding='utf-8')
file_handler.setFormatter(logging.Formatter('[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s'))
logging.getLogger().addHandler(file_handler)
from kotonebot.backend.util import Profiler
from kotonebot.backend.context import init_context, manual_context
init_context()
manual_context().begin()
# hajime_regular(start_from=1)
# pf = Profiler('profiler')
# pf.begin()
# # do_produce(conf().produce.idols[0], 'pro')
# practice()
# hajime_pro(start_from=16)
# pf.end()
# pf.snakeviz()
# while True:

View File

@ -72,38 +72,31 @@ def enter_allowance():
执行活動支給
前置条件位于行动页面且所有行动按钮清晰可见 \n
结束状态
结束状态位于行动页面
"""
logger.info("Executing 活動支給.")
# 点击活動支給 [screenshots\allowance\step_1.png]
logger.info("Double clicking on 活動支給.")
device.double_click(image.expect(R.InPurodyuusu.ButtonTextAllowance), interval=1)
# 等待进入页面
wait_loading_end()
# 处理可能会出现的支援卡奖励
while not image.find(R.InPurodyuusu.IconTitleAllowance):
logger.debug("Waiting for 活動支給 screen.")
acquisitions()
# 第一个箱子 [screenshots\allowance\step_2.png]
logger.info("Clicking on the first lootbox.")
device.click(image.expect_wait_any([
R.InPurodyuusu.LootboxSliverLock
]))
while acquisitions() is None:
logger.info("Waiting for acquisitions finished.")
# 第二个箱子
logger.info("Clicking on the second lootbox.")
device.click(image.expect_wait_any([
R.InPurodyuusu.LootboxSliverLock
]))
while acquisitions() is None:
logger.info("Waiting for acquisitions finished.")
# 领取奖励
while True:
# TODO: 检测是否在行动页面应当单独一个函数
if image.find_multi([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
]):
break
if image.find(R.InPurodyuusu.LootboxSliverLock):
logger.info("Click on lootbox.")
device.click()
continue
if acquisitions() is not None:
continue
logger.info("活動支給 completed.")
# wait_loading_start() # 可能会因为加载太快,截图没截到,导致抛出异常
sleep(1)
wait_loading_end()
# 可能会出现的新动画
# 技能卡:[screenshots\allowance\step_4.png]
@action('判断是否可以休息')
def is_rest_available():

View File

@ -12,7 +12,7 @@ from kotonebot import task, device, image, action, ocr, contains, cropped, rect_
logger = logging.getLogger(__name__)
@action('领取工作奖励')
def acquire_assignment():
def handle_claim_assignment():
"""
领取工作奖励
@ -20,9 +20,10 @@ def acquire_assignment():
结束状态分配工作页面
"""
# 领取奖励 [screenshots/assignment/acquire.png]
while image.wait_for(R.Common.ButtonCompletion, timeout=5):
if image.find(R.Common.ButtonCompletion):
device.click()
sleep(5)
return True
return False
@action('重新分配工作')
def assign(type: Literal['mini', 'online']) -> bool:
@ -124,6 +125,18 @@ def get_remaining_time() -> timedelta | None:
logger.info(f'お仕事 remaining time: {time}')
return timedelta(hours=time.numbers()[0], minutes=time.numbers()[1], seconds=time.numbers()[2])
@action('检测工作页面')
def at_assignment():
"""
判断是否在工作页面
"""
# 不能以 R.Daily.IconTitleAssign 作为判断依据,
# 因为标题出现后还有一段动画
return image.find_multi([
R.Daily.ButtonAssignmentShortenTime,
R.Daily.IconAssignMiniLive,
R.Daily.IconAssignOnlineLive,
]) is not None
@task('工作')
def assignment():
@ -149,26 +162,24 @@ def assignment():
# 点击工作按钮
logger.debug('Clicking assignment icon.')
device.click(btn_assignment)
# 加载页面等待
wait_loading_end()
if completed:
acquire_assignment()
logger.info('Assignment acquired.')
# 领取完后会自动进入分配页面
image.expect_wait(R.Daily.IconTitleAssign)
# 等待加载、领取奖励
while not at_assignment():
if completed and handle_claim_assignment():
logger.info('Assignment acquired.')
# 重新分配
if conf().assignment.mini_live_reassign_enabled:
if image.find(R.Daily.IconAssignMiniLive):
assign('mini')
sleep(6) # 等待动画结束
# TODO: 更好的方法来等待动画结束。
else:
logger.info('MiniLive reassign is disabled.')
if conf().assignment.online_live_reassign_enabled:
if image.find(R.Daily.IconAssignOnlineLive):
assign('online')
sleep(6) # 等待动画结束
else:
logger.info('OnlineLive reassign is disabled.')
# 等待动画结束
while not at_assignment():
pass
if __name__ == '__main__':
import logging

View File

@ -144,6 +144,8 @@ class CommuEventButtonUI:
"""
img = device.screenshot()
rects = filter_rectangles(img, (WHITE_LOW, WHITE_HIGH), 7, 500, rect=self.rect)
if not rects:
return []
selected = self.selected()
result: list[EventButton] = []
for rect in rects:

View File

@ -218,6 +218,22 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')
logging.getLogger('kotonebot').setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
do_produce(conf().produce.idols[0], 'pro')
import os
from datetime import datetime
os.makedirs('logs', exist_ok=True)
log_filename = datetime.now().strftime('logs/task-%y-%m-%d-%H-%M-%S.log')
file_handler = logging.FileHandler(log_filename, encoding='utf-8')
file_handler.setFormatter(logging.Formatter('[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s'))
logging.getLogger().addHandler(file_handler)
from kotonebot.backend.context import init_context
from kotonebot.tasks.common import BaseConfig
init_context(config_type=BaseConfig)
from kotonebot.backend.util import Profiler
pf = Profiler('profiler')
pf.begin()
do_produce(conf().produce.idols[0], 'regular')
pf.end()
pf.snakeviz()
# a()
# select_idol(PIdol.藤田ことね_学園生活)

View File

@ -23,7 +23,7 @@ def start_game():
logger.info("Not at home, going to home")
goto_home()
return
device.start_app('com.bandainamcoent.idolmaster_gakuen')
device.launch_app('com.bandainamcoent.idolmaster_gakuen')
# [screenshots/startup/1.png]
image.wait_for(R.Daily.ButonLinkData, timeout=30)
sleep(2)

Binary file not shown.

After

Width:  |  Height:  |  Size: 392 KiB

View File

@ -0,0 +1 @@
{"definitions":{"1652f06a-5417-49ef-8949-4854772d9ab7":{"name":"Daily.ButtonAssignmentShortenTime","displayName":"工作页面 短缩 时间","type":"template","annotationId":"1652f06a-5417-49ef-8949-4854772d9ab7","useHintRect":false}},"annotations":[{"id":"1652f06a-5417-49ef-8949-4854772d9ab7","type":"rect","data":{"x1":461,"y1":1025,"x2":582,"y2":1060}}]}