feat(task): 商店购买

This commit is contained in:
XcantloadX 2025-01-10 21:56:15 +08:00
parent 5de0c12157
commit 47e1e4b8fc
35 changed files with 325 additions and 19 deletions

View File

@ -9,3 +9,10 @@ pip install -r requirements.dev.txt
python tools/make_resources.py
```
然后打开 VSCode 设置搜索“SupportRestructured Text”并勾选。
### 截图
建议使用 [XnView MP](https://www.xnview.com/en/xnviewmp/) 进行截图裁剪工作。
XnView MP 可以方便的完成“打开图片 → 选区 → 另存选取为文件”这一操作。
只需要提前设置好右键菜单:
![XnView MP 设置1](./images/xnview_setup1.png)

BIN
images/xnview_setup1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

View File

@ -1,4 +1,4 @@
from .client.protocol import DeviceABC
from .backend.context import ContextOcr, ContextImage, ContextDebug, device, ocr, image, debug
from .backend.util import Rect, fuzz, regex, contains, grayscaled, grayscale_cached
from .backend.util import Rect, fuzz, regex, contains, grayscaled, grayscale_cached, cropper, x, y
from .backend.core import task, action

View File

@ -268,7 +268,7 @@ class ContextImage:
:param template: 模板图像可以是图像路径或 cv2.Mat
:param mask: 掩码图像可以是图像路径或 cv2.Mat
:param transparent: 若为 True则认为输入模板是透明的并自动将透明模板转换为 Mask 图像
:param threshold: 阈值默认为 0.8
:param threshold: 阈值默认为 0.9
:param remove_duplicate: 是否移除重复结果默认为 True
"""
ret = find_many(self.context.device.screenshot(), template, mask, threshold=threshold, colored=colored)

41
kotonebot/backend/core.py Normal file
View File

@ -0,0 +1,41 @@
from typing import Callable, overload, Any
def task(
name: str,
description: str|None = None,
):
"""
`task` 装饰器用于标记一个函数为任务函数
:param name: 任务名称
:param description: 任务描述如果为 None则使用函数的 docstring 作为描述
"""
def _task_decorator(func: Callable):
return func
return _task_decorator
@overload
def action(func: Callable[..., Any]) -> Callable[..., Any]: ...
@overload
def action(name: str, description: str|None = None) -> Callable[..., Any]:
"""
`action` 装饰器用于标记一个函数为动作函数
:param name: 动作名称如果为 None则使用函数的名称作为名称
:param description: 动作描述如果为 None则使用函数的 docstring 作为描述
"""
...
def action(*args, **kwargs):
if len(args) == 1:
func = args[0]
return func
elif len(args) == 2:
name = args[0]
description = args[1]
def _action_decorator(func: Callable):
return func
return _action_decorator
else:
raise ValueError("action() takes 1 or 2 positional arguments but 3 were given")

View File

@ -1,12 +1,15 @@
from functools import lru_cache
import re
import typing
from typing import NamedTuple, Callable
from typing import Literal, NamedTuple, Callable, TYPE_CHECKING
import cv2
from cv2.typing import MatLike
from thefuzz import fuzz as _fuzz
if TYPE_CHECKING:
from kotonebot.client.protocol import DeviceABC
class TaskInfo(NamedTuple):
name: str
description: str
@ -66,7 +69,24 @@ def crop_x(img: MatLike, x1: float, x2: float) -> MatLike:
x2_px = int(w * x2)
return img[:, x1_px:x2_px]
def cropper(x1: float, y1: float, x2: float, y2: float) -> Callable[[MatLike], MatLike]:
CropperParams = tuple[Literal['x', 'y'], float, float]
def x(from_: float=0, to: float=1) -> CropperParams:
return ('x', from_, to)
def y(from_: float=0, to: float=1) -> CropperParams:
return ('y', from_, to)
def cropper(*params: CropperParams) -> Callable[[MatLike], MatLike]:
if len(params) == 0:
raise ValueError("cropper() takes at least 1 positional argument.")
elif len(params) > 2:
raise ValueError("cropper() takes at most 2 positional arguments.")
x1, x2, y1, y2 = 0, 1, 0, 1
for param in params:
if param[0] == 'x':
x1, x2 = param[1], param[2]
elif param[0] == 'y':
y1, y2 = param[1], param[2]
return lambda img: crop(img, x1, y1, x2, y2)
def cropper_y(y1: float, y2: float) -> Callable[[MatLike], MatLike]:
@ -75,6 +95,68 @@ def cropper_y(y1: float, y2: float) -> Callable[[MatLike], MatLike]:
def cropper_x(x1: float, x2: float) -> Callable[[MatLike], MatLike]:
return lambda img: crop_x(img, x1, x2)
class DeviceHookContextManager:
def __init__(
self,
device: 'DeviceABC',
*,
screenshot_hook_before: Callable[[], MatLike|None] | None = None,
screenshot_hook_after: Callable[[MatLike], MatLike] | None = None,
click_hook_before: Callable[[int, int], tuple[int, int]] | None = None,
):
self.device = device
self.screenshot_hook_before = screenshot_hook_before
self.screenshot_hook_after = screenshot_hook_after
self.click_hook_before = click_hook_before
self.old_screenshot_hook_before = self.device.screenshot_hook_before
self.old_screenshot_hook_after = self.device.screenshot_hook_after
def __enter__(self):
if self.screenshot_hook_before is not None:
self.device.screenshot_hook_before = self.screenshot_hook_before
if self.screenshot_hook_after is not None:
self.device.screenshot_hook_after = self.screenshot_hook_after
if self.click_hook_before is not None:
self.device.click_hooks_before.append(self.click_hook_before)
return self.device
def __exit__(self, exc_type, exc_value, traceback):
self.device.screenshot_hook_before = self.old_screenshot_hook_before
self.device.screenshot_hook_after = self.old_screenshot_hook_after
if self.click_hook_before is not None:
self.device.click_hooks_before.remove(self.click_hook_before)
def cropped(
device: 'DeviceABC',
x1: float = 0,
y1: float = 0,
x2: float = 1,
y2: float = 1,
) -> DeviceHookContextManager:
"""
Hook 设备截图与点击操作将截图裁剪为指定区域并调整点击坐标
在进行 OCR 识别或模板匹配时可以先使用此函数缩小图像加快速度
:param device: 设备对象
:param x1: 裁剪区域左上角相对X坐标范围 [0, 1]默认为 0
:param y1: 裁剪区域左上角相对Y坐标范围 [0, 1]默认为 0
:param x2: 裁剪区域右下角相对X坐标范围 [0, 1]默认为 1
:param y2: 裁剪区域右下角相对Y坐标范围 [0, 1]默认为 1
"""
def _screenshot_hook(img: MatLike) -> MatLike:
return crop(img, x1, y1, x2, y2)
def _click_hook(x: int, y: int) -> tuple[int, int]:
w, h = device.screen_size
x_px = int(x1 * w + x)
y_px = int(y1 * h + y)
return x_px, y_px
return DeviceHookContextManager(
device,
screenshot_hook_after=_screenshot_hook,
click_hook_before=_click_hook,
)
def grayscaled(img: MatLike | str) -> MatLike:
if isinstance(img, str):

View File

@ -47,6 +47,9 @@ class AdbDevice(DeviceABC):
self.click(x, y)
def __click_point(self, x: int, y: int) -> None:
for hook in self.click_hooks_before:
logger.debug("execute click hook before")
x, y = hook(x, y)
self.device.shell(f"input tap {x} {y}")
def __click_clickable(self, clickable: ClickableObjectProtocol) -> None:

View File

@ -69,7 +69,9 @@ class DeviceABC(ABC):
self.screenshot_hook_after: Callable[[MatLike], MatLike] | None = None
"""截图后调用的函数"""
self.screenshot_hook_before: Callable[[], MatLike | None] | None = None
"""截图前调用的函数"""
"""截图前调用的函数。返回修改后的截图。"""
self.click_hooks_before: list[Callable[[int, int], tuple[int, int]]] = []
"""点击前调用的函数。返回修改后的点击坐标。"""
self.last_find: Rect | ClickableObjectProtocol | None = None
"""上次 image 对象或 ocr 对象的寻找结果"""
@ -197,7 +199,9 @@ class DeviceABC(ABC):
@property
def screen_size(self) -> tuple[int, int]:
"""
屏幕尺寸
屏幕尺寸
:return: 屏幕尺寸格式为 (width, height)
"""
...

View File

@ -77,7 +77,7 @@ def acquisitions() -> AcquisitionType | None:
device.click(image.expect(R.InPurodyuusu.ButtonLeave))
sleep(0.7)
# 可能需要点击确认
device.click(image.expect(R.InPurodyuusu.ButtonConfirm, threshold=0.8))
device.click(image.expect(R.Common.ButtonConfirm, threshold=0.8))
return "PDrinkMax"
# 技能卡被动领取(支援卡效果)
logger.info("Check skill card acquisition...")

View File

@ -277,7 +277,7 @@ def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
sleep(random.uniform(0.5, 1.5))
device.click(x + w//2, y + h//2)
# 体力溢出提示框
ret = image.wait_for(R.InPurodyuusu.ButtonConfirm, timeout=1)
ret = image.wait_for(R.Common.ButtonConfirm, timeout=1)
if ret is not None:
logger.info("Skill card confirmation dialog detected")
device.click(ret)
@ -472,7 +472,7 @@ def produce_end():
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
sleep(1)
# 決定
device.click(image.expect_wait(R.InPurodyuusu.ButtonConfirm, threshold=0.8))
device.click(image.expect_wait(R.Common.ButtonConfirm, threshold=0.8))
sleep(1)
# 上传图片。注意网络可能会很慢,可能出现上传失败对话框
retry_count = 0
@ -716,13 +716,13 @@ if __name__ == '__main__':
getLogger(__name__).setLevel(logging.DEBUG)
init_context()
# while not image.wait_for_any([
# R.InPurodyuusu.TextPDiary, # 普通周
# R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
# ], timeout=2):
# logger.info("Action scene not detected. Retry...")
# acquisitions()
# sleep(3)
while not image.wait_for_any([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
], timeout=2):
logger.info("Action scene not detected. Retry...")
acquisitions()
sleep(3)
# image.wait_for_any([
# R.InPurodyuusu.TextPDiary, # 普通周
@ -740,7 +740,7 @@ if __name__ == '__main__':
# acquisitions()
# acquire_pdorinku(0)
# image.wait_for(R.InPurodyuusu.InPractice.PDorinkuIcon)
hajime_regular(start_from=12)
# hajime_regular(start_from=12)
# until_practice_scene()
# device.click(image.expect_wait_any([
# R.InPurodyuusu.PSkillCardIconBlue,

View File

@ -45,7 +45,7 @@ def acquire_pdorinku(index: int):
sleep(0.5)
device.click(image.expect(R.InPurodyuusu.ButtonNotAcquire))
sleep(0.5)
device.click(image.expect(R.InPurodyuusu.ButtonConfirm))
device.click(image.expect(R.Common.ButtonConfirm))
else:
# 点击饮料
drinks = list_pdorinku()

View File

@ -0,0 +1,58 @@
import time
from typing import Callable
from .. import R
from kotonebot import device, image, ocr, action, cropper, x, y
def until(
condition: Callable[[], bool],
timeout: float=10,
interval: float=0.5,
critical: bool=False
) -> bool:
"""
等待条件成立如果条件不成立则返回 False 或抛出异常
:param condition: 条件函数
:param timeout: 等待时间单位为秒
:param interval: 检查条件的时间间隔单位为秒
:param critical: 如果条件不成立是否抛出异常
"""
start = time.time()
while not condition():
if time.time() - start > timeout:
if critical:
raise TimeoutError(f"Timeout while waiting for condition {condition.__name__}.")
return False
time.sleep(interval)
return True
@action
def at_home() -> bool:
with device.hook(cropper(y(from_=0.7))):
return image.find(R.Daily.ButtonHomeCurrent) is not None
@action
def at_shop() -> bool:
with device.hook(cropper(y(to=0.3))):
return image.find(R.Daily.IconShopTitle) is not None
@action
def goto_shop():
"""
从首页进入 ショップ
前置条件位于首页 \n
结束状态位于商店页面
"""
device.click(image.expect(R.Daily.ButtonShop))
until(at_shop, critical=True)
if __name__ == "__main__":
from kotonebot.backend.context import init_context
init_context()
print(at_home())
print(at_shop())
goto_shop()

111
kotonebot/tasks/purchase.py Normal file
View File

@ -0,0 +1,111 @@
"""从商店购买物品"""
import logging
from time import sleep
from kotonebot import task
from kotonebot import device, image, ocr, action
from kotonebot.backend.util import cropped
from .actions.scenes import goto_shop
from . import R
logger = logging.getLogger(__name__)
@action
def money_items():
"""
购买マニー物品
前置条件位于商店页面的マニー Tab
"""
logger.info(f'Purchasing マニー items.')
# [screenshots\shop\money1.png]
results = image.find_many(R.Daily.TextShopRecommended)
# device.click(results[0])
index = 1
for index, result in enumerate(results, 1):
# [screenshots\shop\dialog.png]
logger.info(f'Purchasing #{index} item.')
device.click(result)
sleep(0.5)
with cropped(device, y1=0.3):
purchased = image.expect_wait(R.Daily.TextShopPurchased, timeout=1)
if purchased is not None:
logger.info(f'AP item #{index} already purchased.')
continue
comfirm = image.expect_wait(R.Common.ButtonConfirm, timeout=2)
# 如果数量不是最大,调到最大
while image.find(R.Daily.ButtonShopCountAdd, colored=True):
logger.debug('Adjusting quantity(+1)...')
device.click()
sleep(0.3)
logger.debug(f'Confirming purchase...')
device.click(comfirm)
sleep(1.5)
logger.info(f'Purchasing マニー items completed. {index} items purchased.')
@action
def ap_items(item_indices: list[int]):
"""
购买AP物品
前置条件位于商店页面的AP Tab
:param item_indices: 要购买的物品索引列表 0 开始
"""
# [screenshots\shop\ap1.png]
logger.info(f'Purchasing AP items.')
results = image.find_many(R.Daily.IconShopAp, threshold=0.7)
sleep(10)
# 按 X, Y 坐标排序从小到大
results = sorted(results, key=lambda x: (x.position[0], x.position[1]))
for index in item_indices:
if index <= len(results):
logger.info(f'Purchasing #{index} AP item.')
device.click(results[index])
sleep(0.5)
with cropped(device, y1=0.3):
purchased = image.expect_wait(R.Daily.TextShopPurchased, timeout=1)
if purchased is not None:
logger.info(f'AP item #{index} already purchased.')
continue
comfirm = image.expect_wait(R.Common.ButtonConfirm, timeout=2)
# 如果数量不是最大,调到最大
while image.find(R.Daily.ButtonShopCountAdd, colored=True):
logger.debug('Adjusting quantity(+1)...')
device.click()
sleep(0.3)
logger.debug(f'Confirming purchase...')
device.click(comfirm)
sleep(1.5)
else:
logger.warning(f'AP item #{index} not found')
logger.info(f'Purchasing AP items completed. {len(item_indices)} items purchased.')
@task('商店购买')
def purchase():
"""
从商店购买物品
"""
goto_shop()
# 进入每日商店 [screenshots\shop\shop.png]
# [ap1.png]
device.click(image.expect(R.Daily.ButtonDailyShop)) # TODO: memoable
sleep(1)
# 购买マニー物品
image.expect_wait(R.Daily.IconShopMoney)
money_items()
# 点击 AP 选项卡
device.click(image.expect_wait(R.Daily.TextTabShopAp, timeout=2)) # TODO: memoable
# 等待 AP 选项卡加载完成
image.expect_wait(R.Daily.IconShopAp)
ap_items([0, 1, 2, 3])
if __name__ == '__main__':
from kotonebot.backend.context import init_context
import logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')
logger.setLevel(logging.DEBUG)
init_context()
# money_items()
# ap_items([0, 1, 3])
purchase()

View File

Before

Width:  |  Height:  |  Size: 3.6 KiB

After

Width:  |  Height:  |  Size: 3.6 KiB

View File

Before

Width:  |  Height:  |  Size: 2.2 KiB

After

Width:  |  Height:  |  Size: 2.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 905 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 778 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 638 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 600 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

BIN
screenshots/shop/ap1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 309 KiB

BIN
screenshots/shop/dialog.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 324 KiB

BIN
screenshots/shop/money1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 629 KiB

BIN
screenshots/shop/shop.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 526 KiB