feat(task): 调整部分延时 & 新增对进行中的竞赛的支持

This commit is contained in:
XcantloadX 2025-01-16 12:13:41 +08:00
parent 9222b66cc1
commit 9ee39b0804
9 changed files with 157 additions and 57 deletions

13
.vscode/launch.json vendored
View File

@ -32,6 +32,19 @@
"-c",
"${command:extension.commandvariable.file.relativeDirDots}.${fileBasenameNoExtension}"
]
},
{
"name": "KotonebotDebug: All tasks",
"type": "debugpy",
"request": "launch",
"console": "integratedTerminal",
"module": "kotonebot.backend.debug.entry",
"args": [
"-s",
"${workspaceFolder}/dumps",
"-c",
"kotonebot.run.run"
]
}
]
}

View File

@ -9,6 +9,7 @@ from .backend.context import (
image,
debug,
color,
rect_expand
)
from .backend.util import (
Rect,
@ -22,6 +23,8 @@ from .backend.util import (
y,
cropped,
UnrecoverableError,
AdaptiveWait,
)
from .backend.core import task, action
from .ui import user

View File

@ -34,6 +34,8 @@ from kotonebot.backend.color import find_rgb
from kotonebot.backend.ocr import Ocr, OcrResult, jp, en, StringMatchFunction
OcrLanguage = Literal['jp', 'en']
DEFAULT_TIMEOUT = 120
DEFAULT_INTERVAL = 0.4
# https://stackoverflow.com/questions/74714300/paramspec-for-a-pre-defined-function-without-using-generic-callablep
T = TypeVar('T')
@ -160,7 +162,13 @@ class ContextOcr:
self.context.device.last_find = ret
return ret
def expect_wait(self, pattern: str | re.Pattern | StringMatchFunction, timeout: float = 10) -> OcrResult:
def expect_wait(
self,
pattern: str | re.Pattern | StringMatchFunction,
timeout: float = DEFAULT_TIMEOUT,
*,
interval: float = DEFAULT_INTERVAL
) -> OcrResult:
"""
等待指定文本出现
"""
@ -172,9 +180,15 @@ class ContextOcr:
return result
if time.time() - start_time > timeout:
raise TimeoutError(f"Timeout waiting for {pattern}")
time.sleep(0.1)
time.sleep(interval)
def wait_for(self, pattern: str | re.Pattern | StringMatchFunction, timeout: float = 10) -> OcrResult | None:
def wait_for(
self,
pattern: str | re.Pattern | StringMatchFunction,
timeout: float = DEFAULT_TIMEOUT,
*,
interval: float = DEFAULT_INTERVAL
) -> OcrResult | None:
"""
等待指定文本出现
"""
@ -186,7 +200,7 @@ class ContextOcr:
return result
if time.time() - start_time > timeout:
return None
time.sleep(0.1)
time.sleep(interval)
class ContextImage:
@ -202,11 +216,11 @@ class ContextImage:
template: MatLike | str,
mask: MatLike | str | None = None,
threshold: float = 0.9,
timeout: float = 10,
timeout: float = DEFAULT_TIMEOUT,
colored: bool = False,
interval: float = 0.1,
*,
transparent: bool = False
transparent: bool = False,
interval: float = DEFAULT_INTERVAL,
) -> TemplateMatchResult | None:
"""
等待指定图像出现
@ -226,10 +240,11 @@ class ContextImage:
templates: list[str],
masks: list[str | None] | None = None,
threshold: float = 0.9,
timeout: float = 10,
timeout: float = DEFAULT_TIMEOUT,
colored: bool = False,
*,
transparent: bool = False
transparent: bool = False,
interval: float = DEFAULT_INTERVAL
):
"""
等待指定图像中的任意一个出现
@ -245,17 +260,18 @@ class ContextImage:
return True
if time.time() - start_time > timeout:
return False
time.sleep(0.1)
time.sleep(interval)
def expect_wait(
self,
template: str,
mask: str | None = None,
threshold: float = 0.9,
timeout: float = 10,
timeout: float = DEFAULT_TIMEOUT,
colored: bool = False,
*,
transparent: bool = False
transparent: bool = False,
interval: float = DEFAULT_INTERVAL
) -> TemplateMatchResult:
"""
等待指定图像出现
@ -268,17 +284,18 @@ class ContextImage:
return ret
if time.time() - start_time > timeout:
raise TimeoutError(f"Timeout waiting for {template}")
time.sleep(0.1)
time.sleep(interval)
def expect_wait_any(
self,
templates: list[str],
masks: list[str | None] | None = None,
threshold: float = 0.9,
timeout: float = 10,
timeout: float = DEFAULT_TIMEOUT,
colored: bool = False,
*,
transparent: bool = False
transparent: bool = False,
interval: float = DEFAULT_INTERVAL
) -> TemplateMatchResult:
"""
等待指定图像中的任意一个出现
@ -296,7 +313,7 @@ class ContextImage:
return ret
if time.time() - start_time > timeout:
raise TimeoutError(f"Timeout waiting for any of {templates}")
time.sleep(0.1)
time.sleep(interval)
@context(expect)
def expect(self, *args, **kwargs):
@ -420,6 +437,12 @@ class Context:
def debug(self) -> 'ContextDebug':
return self.__debug
def rect_expand(rect: Rect, left: int = 0, top: int = 0, right: int = 0, bottom: int = 0) -> Rect:
"""
向四个方向扩展矩形区域
"""
return (rect[0] - left, rect[1] - top, rect[2] + right, rect[3] + bottom)
# 暴露 Context 的属性到模块级别
_c: Context | None = None
device: DeviceABC = cast(DeviceABC, Forwarded(name="device"))

View File

@ -1,6 +1,8 @@
from functools import lru_cache
import re
import time
import typing
from time import sleep
from functools import lru_cache
from typing import Literal, NamedTuple, Callable, TYPE_CHECKING
import cv2
@ -170,3 +172,43 @@ def grayscaled(img: MatLike | str) -> MatLike:
def grayscale_cached(img: MatLike | str) -> MatLike:
return grayscaled(img)
class AdaptiveWait:
"""
自适应延时延迟时间会随着时间逐渐增加直到达到最大延迟时间
"""
def __init__(
self,
base_interval: float = 0.5,
max_interval: float = 10,
*,
timeout: float = -1,
timeout_message: str = "Timeout",
factor: float = 1.15,
):
self.base_interval = base_interval
self.max_interval = max_interval
self.interval = base_interval
self.factor = factor
self.timeout = timeout
self.start_time: float | None = time.time()
self.timeout_message = timeout_message
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.reset()
def __call__(self):
if self.start_time is None:
self.start_time = time.time()
sleep(self.interval)
self.interval = min(self.interval * self.factor, self.max_interval)
if self.timeout > 0 and time.time() - self.start_time > self.timeout:
raise TimeoutError(self.timeout_message)
def reset(self):
self.interval = self.base_interval
self.start_time = None

View File

@ -51,8 +51,10 @@ class AdbDevice(DeviceABC):
def __click_point(self, x: int, y: int) -> None:
for hook in self.click_hooks_before:
logger.debug("execute click hook before")
logger.debug(f"Executing click hook before: ({x}, {y})")
x, y = hook(x, y)
logger.debug(f"Click hook before result: ({x}, {y})")
logger.debug(f"Click: {x}, {y}")
self.device.shell(f"input tap {x} {y}")
def __click_clickable(self, clickable: ClickableObjectProtocol) -> None:

View File

@ -6,60 +6,74 @@ from gettext import gettext as _
from . import R
from .actions.scenes import at_home, goto_home
from .actions.loading import wait_loading_end
from kotonebot import device, image, ocr, action, task, user
from kotonebot import device, image, ocr, color, action, task, user, rect_expand
logger = logging.getLogger(__name__)
@action('前往竞赛页面')
def goto_contest():
def goto_contest() -> bool:
"""
前置条件位于首页 \n
结束状态位于竞赛界面且已经点击了各种奖励领取提示
:return: 是否存在未完成的挑战
"""
device.click(image.expect(R.Common.ButtonContest))
sleep(0.5)
device.click(image.expect_wait(R.Daily.TextContest, colored=True, transparent=True, threshold=0.9999))
btn_contest = image.expect_wait(R.Daily.TextContest, colored=True, transparent=True, threshold=0.9999)
sleep(0.2)
has_ongoing_contest = image.find(R.Daily.TextContestLastOngoing) is not None
device.click(btn_contest)
sleep(0.5)
wait_loading_end()
while not image.find(R.Daily.ButtonContestRanking):
# [screenshots/contest/acquire1.png]
# [screenshots/contest/acquire2.png]
device.click_center()
sleep(1)
# [screenshots/contest/main.png]
if not has_ongoing_contest:
while not image.find(R.Daily.ButtonContestRanking):
# [screenshots/contest/acquire1.png]
# [screenshots/contest/acquire2.png]
device.click_center()
sleep(1)
# [screenshots/contest/main.png]
else:
image.expect_wait(R.Daily.ButtonContestChallengeStart)
return has_ongoing_contest
@action('选择并挑战')
def pick_and_contest() -> bool:
def pick_and_contest(has_ongoing_contest: bool = False) -> bool:
"""
选择并挑战
前置条件位于竞赛界面 \n
结束状态位于竞赛界面
:param has_ongoing_contest: 是否有中断未完成的挑战
:return: 如果返回假说明今天挑战次数已经用完了
"""
image.expect_wait(R.Daily.ButtonContestRanking)
sleep(1) # 等待动画
logger.info('Randomly pick a contestant and start challenge.')
# 随机选一个对手 [screenshots/contest/main.png]
logger.debug('Clicking on contestant.')
contestant = image.wait_for(R.Daily.TextContestOverallStats, timeout=2)
if contestant is None:
logger.info('No contestant found. Today\'s challenge points used up.')
return False
device.click(contestant)
# 挑战开始 [screenshots/contest/start1.png]
logger.debug('Clicking on start button.')
device.click(image.expect_wait(R.Daily.ButtonContestStart))
# 判断是否有中断未完成的挑战
# [screenshots/contest/ongoing.png]
if not has_ongoing_contest:
image.expect_wait(R.Daily.ButtonContestRanking)
sleep(1) # 等待动画
logger.info('Randomly pick a contestant and start challenge.')
# 随机选一个对手 [screenshots/contest/main.png]
logger.debug('Clicking on contestant.')
contestant = image.wait_for(R.Daily.TextContestOverallStats, timeout=2)
if contestant is None:
logger.info('No contestant found. Today\'s challenge points used up.')
return False
device.click(contestant)
# 挑战开始 [screenshots/contest/start1.png]
logger.debug('Clicking on start button.')
device.click(image.expect_wait(R.Daily.ButtonContestStart))
sleep(1)
# 记忆未编成 [screenshots/contest/no_memo.png]
if image.find(R.Daily.TextContestNoMemory):
logger.debug('Memory not set. Using auto-compilation.')
user.warning(_('记忆未编成。将使用自动编成。'), once=True)
device.click(image.expect(R.Daily.ButtonContestChallenge))
# 进入挑战页面 [screenshots/contest/contest1.png]
# [screenshots/contest/contest2.png]
image.expect_wait(R.Daily.ButtonContestChallengeStart)
while not image.find(R.Daily.ButtonContestChallengeStart):
# 记忆未编成 [screenshots/contest/no_memo.png]
if image.find(R.Daily.TextContestNoMemory):
logger.debug('Memory not set. Using auto-compilation.')
user.warning(_('记忆未编成。将使用自动编成。'), once=True)
device.click(image.expect(R.Daily.ButtonContestChallenge))
logger.debug('Waiting for challenge start screen.')
# 勾选跳过所有
if image.find(R.Common.CheckboxUnchecked):
logger.debug('Checking skip all.')
@ -95,9 +109,16 @@ def contest():
logger.info('Contest started.')
if not at_home():
goto_home()
goto_contest()
while pick_and_contest():
sleep(0.3)
btn_contest = image.expect(R.Common.ButtonContest)
notification_dot = rect_expand(btn_contest.rect, top=35, right=35)
if not color.find_rgb('#ff104a', rect=notification_dot):
logger.info('No action needed.')
return
has_ongoing_contest = goto_contest()
while pick_and_contest(has_ongoing_contest):
sleep(1.3)
has_ongoing_contest = False
goto_home()
logger.info('Contest all finished.')
@ -110,9 +131,4 @@ if __name__ == '__main__':
logger.setLevel(logging.DEBUG)
init_context()
# if image.find(R.Common.CheckboxUnchecked):
# logger.debug('Checking skip all.')
# device.click()
# sleep(0.5)
# device.click(image.expect(R.Daily.ButtonIconSkip, colored=True, transparent=True, threshold=0.999))
contest()

View File

@ -2,7 +2,7 @@
import logging
from time import sleep
from kotonebot import task, device, image, cropped
from kotonebot import task, device, image, cropped, AdaptiveWait
from . import R
from .common import Priority
from .actions.loading import loading
@ -21,9 +21,10 @@ def start_game():
image.wait_for(R.Daily.ButonLinkData, timeout=30)
sleep(2)
device.click_center()
wait = AdaptiveWait(timeout=240, timeout_message='Game startup loading timeout')
while True:
while loading():
sleep(3)
wait()
with device.pinned():
if image.find(R.Daily.ButtonHomeCurrent):
break
@ -35,7 +36,7 @@ def start_game():
device.click()
else:
device.click_center()
sleep(2)
wait()
if __name__ == '__main__':
from kotonebot.backend.context import init_context

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 829 KiB