feat(task): 基本培育流程完成
|
@ -7,6 +7,11 @@
|
|||
"label": "Clear test results",
|
||||
"type": "shell",
|
||||
"command": "powershell \"rm -r tests/output_images\""
|
||||
},
|
||||
{
|
||||
"label": "Make R.py",
|
||||
"type": "shell",
|
||||
"command": "python ./tools/make_resources.py"
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
这个文件夹下放的是临时测试代码。
|
Before Width: | Height: | Size: 935 B After Width: | Height: | Size: 935 B |
After Width: | Height: | Size: 1.0 MiB |
After Width: | Height: | Size: 468 KiB |
|
@ -0,0 +1,66 @@
|
|||
import cv2
|
||||
import numpy as np
|
||||
|
||||
def cv_imread(filePath):
|
||||
cv_img=cv2.imdecode(np.fromfile(filePath,dtype=np.uint8),-1)
|
||||
## imdecode读取的是rgb,如果后续需要opencv处理的话,需要转换成bgr,转换后图片颜色会变化
|
||||
##cv_img=cv2.cvtColor(cv_img,cv2.COLOR_RGB2BGR)
|
||||
return cv_img
|
||||
|
||||
VERTICAL_BOX_COUNT = 12
|
||||
HORIZONTAL_BOX_COUNT = 10
|
||||
LINE_WIDTH = 2
|
||||
FONT_SIZE = 1.2
|
||||
FONT_STROKE_WIDTH = 3
|
||||
COLOR = (0, 255, 0)
|
||||
|
||||
def draw_grid_with_numbers(image_path, output_path):
|
||||
# 读取图像
|
||||
img = cv_imread(image_path)
|
||||
height, width = img.shape[:2]
|
||||
|
||||
# 计算网格大小
|
||||
grid_height = height // VERTICAL_BOX_COUNT
|
||||
grid_width = width // HORIZONTAL_BOX_COUNT
|
||||
|
||||
# 画垂直线
|
||||
for i in range(HORIZONTAL_BOX_COUNT + 1):
|
||||
x = i * grid_width
|
||||
cv2.line(img, (x, 0), (x, height), COLOR, LINE_WIDTH)
|
||||
|
||||
# 画水平线
|
||||
for i in range(VERTICAL_BOX_COUNT + 1):
|
||||
y = i * grid_height
|
||||
cv2.line(img, (0, y), (width, y), COLOR, LINE_WIDTH)
|
||||
|
||||
# 在每个网格中添加编号
|
||||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||||
font_scale = FONT_SIZE
|
||||
number = 0
|
||||
|
||||
for i in range(VERTICAL_BOX_COUNT):
|
||||
for j in range(HORIZONTAL_BOX_COUNT):
|
||||
x = j * grid_width + grid_width // 2
|
||||
y = i * grid_height + grid_height // 2
|
||||
|
||||
# 获取文本大小以居中显示
|
||||
text = str(number)
|
||||
(text_width, text_height), _ = cv2.getTextSize(text, font, font_scale, 1)
|
||||
text_x = int(x - text_width // 2)
|
||||
text_y = int(y + text_height // 2)
|
||||
|
||||
cv2.putText(img, text, (text_x, text_y), font, font_scale, COLOR, FONT_STROKE_WIDTH)
|
||||
number += 1
|
||||
|
||||
# 保存图像
|
||||
cv2.imwrite(output_path, img)
|
||||
return img
|
||||
|
||||
# 使用示例
|
||||
image_path = r"C:\Users\user\Documents\MuMu共享文件夹\Screenshots\MuMu12-20250105-100943.png"
|
||||
# 复制原图为 ai_grid_original.jpg
|
||||
import shutil
|
||||
shutil.copy(image_path, "ai_grid_original.jpg")
|
||||
|
||||
output_path = "ai_grid_pre_process.jpg"
|
||||
result = draw_grid_with_numbers(image_path, output_path)
|
Before Width: | Height: | Size: 9.4 KiB After Width: | Height: | Size: 9.4 KiB |
Before Width: | Height: | Size: 6.7 KiB After Width: | Height: | Size: 6.7 KiB |
Before Width: | Height: | Size: 7.2 KiB After Width: | Height: | Size: 7.2 KiB |
|
@ -1,12 +1,10 @@
|
|||
import cv2
|
||||
from cv2.typing import MatLike
|
||||
import numpy as np
|
||||
from client.device.adb import AdbDevice
|
||||
from kotonebot.client.device.adb import AdbDevice
|
||||
from adbutils import adb
|
||||
from typing import NamedTuple
|
||||
# 初始化ADB设备
|
||||
adb.connect("127.0.0.1:16384")
|
||||
device = AdbDevice(adb.device_list()[0])
|
||||
|
||||
|
||||
# 定义检测参数
|
||||
TARGET_ASPECT_RATIO_RANGE = (0.73, 0.80)
|
||||
|
@ -164,30 +162,40 @@ def detect_cards(image: MatLike, card_dimensions: list[tuple[int, int, int, int]
|
|||
return card_contours
|
||||
|
||||
def main():
|
||||
# while True:
|
||||
# # 获取屏幕截图
|
||||
# img = device.screenshot()
|
||||
# 初始化ADB设备
|
||||
# adb.connect("127.0.0.1:16384")
|
||||
# device = AdbDevice(adb.device_list()[0])
|
||||
CARD_POSITIONS_4 = [
|
||||
(17, 883, 192, 252),
|
||||
(182, 883, 192, 252),
|
||||
(346, 883, 192, 252),
|
||||
(511, 883, 192, 252),
|
||||
# delta_x = 165, delta_x-width = -27
|
||||
]
|
||||
while True:
|
||||
# 获取屏幕截图
|
||||
img = cv2.imread('tests/images/produce/recommended_card_4_3_0.png')
|
||||
|
||||
# # 检测卡片
|
||||
# cards = detect_cards(img)
|
||||
# 检测卡片
|
||||
cards = detect_cards(img, CARD_POSITIONS_4)
|
||||
|
||||
# # 如果检测到3个或更多卡片
|
||||
# if len(cards) >= 3:
|
||||
# print("检测到3个卡片!")
|
||||
# # 在图像上绘制检测结果
|
||||
# for i, (x, y, w, h, glow) in enumerate(cards[:3]):
|
||||
# cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
|
||||
# cv2.putText(img, f"Card {i+1}", (x, y-10),
|
||||
# cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
|
||||
# 如果检测到3个或更多卡片
|
||||
if len(cards) >= 3:
|
||||
print("检测到3个卡片!")
|
||||
# 在图像上绘制检测结果
|
||||
for i, (x, y, w, h, glow, is_target) in enumerate(cards[:3]):
|
||||
cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
|
||||
cv2.putText(img, f"Card {i+1}", (x, y-10),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
|
||||
|
||||
# # 显示结果
|
||||
# cv2.imshow("Detected Cards", img)
|
||||
# # cv2.waitKey(0)
|
||||
# # cv2.destroyAllWindows()
|
||||
# # break
|
||||
# 显示结果
|
||||
cv2.imshow("Detected Cards", img)
|
||||
# cv2.waitKey(0)
|
||||
# cv2.destroyAllWindows()
|
||||
# break
|
||||
|
||||
# # 等待1秒后继续检测
|
||||
# cv2.waitKey(1000)
|
||||
# 等待1秒后继续检测
|
||||
cv2.waitKey(1000)
|
||||
|
||||
from kotonebot.client.device.fast_screenshot import AdbFastScreenshots
|
||||
with AdbFastScreenshots(
|
Before Width: | Height: | Size: 1.1 MiB After Width: | Height: | Size: 1.1 MiB |
Before Width: | Height: | Size: 1.3 MiB After Width: | Height: | Size: 1.3 MiB |
Before Width: | Height: | Size: 520 KiB After Width: | Height: | Size: 520 KiB |
Before Width: | Height: | Size: 52 KiB After Width: | Height: | Size: 52 KiB |
Before Width: | Height: | Size: 461 B After Width: | Height: | Size: 461 B |
|
@ -1,15 +1,15 @@
|
|||
from kotonebot.client.protocol import DeviceProtocol
|
||||
from .backend.context import ContextOcr, ContextImage, ContextDebug, _c
|
||||
from .backend.util import Rect, fuzz, regex, contains
|
||||
from kotonebot.client.protocol import DeviceABC
|
||||
from .backend.context import ContextOcr, ContextImage, ContextDebug, device, ocr, image, debug
|
||||
from .backend.util import Rect, fuzz, regex, contains, grayscale
|
||||
|
||||
device: DeviceProtocol
|
||||
ocr: ContextOcr
|
||||
image: ContextImage
|
||||
debug: ContextDebug
|
||||
# device: DeviceProtocol
|
||||
# ocr: ContextOcr
|
||||
# image: ContextImage
|
||||
# debug: ContextDebug
|
||||
|
||||
def __getattr__(name: str):
|
||||
try:
|
||||
return getattr(_c, name)
|
||||
except AttributeError:
|
||||
return globals()[name]
|
||||
# def __getattr__(name: str):
|
||||
# try:
|
||||
# return getattr(_c, name)
|
||||
# except AttributeError:
|
||||
# return globals()[name]
|
||||
|
||||
|
|
|
@ -8,13 +8,13 @@ from typing import Callable, TYPE_CHECKING, cast, overload, Any, TypeVar, Litera
|
|||
from kotonebot.client.device.adb import AdbDevice
|
||||
|
||||
import cv2
|
||||
if TYPE_CHECKING:
|
||||
from cv2.typing import MatLike
|
||||
from cv2.typing import MatLike
|
||||
|
||||
|
||||
import kotonebot.backend.image as raw_image
|
||||
from kotonebot.backend.image import CropResult, TemplateMatchResult, find_crop, expect, find
|
||||
from kotonebot.backend.image import CropResult, TemplateMatchResult, find_crop, expect, find, find_any
|
||||
from kotonebot.backend.util import Rect
|
||||
from kotonebot.client import DeviceProtocol
|
||||
from kotonebot.client import DeviceABC
|
||||
from kotonebot.backend.ocr import Ocr, OcrResult, jp, en, StringMatchFunction
|
||||
|
||||
OcrLanguage = Literal['jp', 'en']
|
||||
|
@ -25,7 +25,7 @@ class ContextOcr:
|
|||
self.context = context
|
||||
self.__engine = jp
|
||||
|
||||
def raw(self, lang: OcrLanguage) -> Ocr:
|
||||
def raw(self, lang: OcrLanguage = 'jp') -> Ocr:
|
||||
"""
|
||||
返回 `kotonebot.backend.ocr` 中的 Ocr 对象。\n
|
||||
Ocr 对象与此对象的区别是,此对象会自动截图,而 Ocr 对象需要手动传入图像参数。
|
||||
|
@ -65,9 +65,13 @@ class ContextOcr:
|
|||
def find(self, *args, **kwargs) -> OcrResult | None:
|
||||
"""检查指定图像是否包含指定文本。"""
|
||||
if len(args) == 1 and len(kwargs) == 0:
|
||||
return self.__engine.find(self.context.device.screenshot(), args[0])
|
||||
ret = self.__engine.find(self.context.device.screenshot(), args[0])
|
||||
self.context.device.last_find = ret
|
||||
return ret
|
||||
elif len(args) == 2 and len(kwargs) == 0:
|
||||
return self.__engine.find(args[0], args[1])
|
||||
ret = self.__engine.find(args[0], args[1])
|
||||
self.context.device.last_find = ret
|
||||
return ret
|
||||
else:
|
||||
raise ValueError("Invalid arguments")
|
||||
|
||||
|
@ -80,7 +84,9 @@ class ContextOcr:
|
|||
|
||||
与 `find()` 的区别在于,`expect()` 未找到时会抛出异常。
|
||||
"""
|
||||
return self.__engine.expect(self.context.device.screenshot(), pattern)
|
||||
ret = self.__engine.expect(self.context.device.screenshot(), pattern)
|
||||
self.context.device.last_find = ret
|
||||
return ret
|
||||
|
||||
def expect_wait(self, pattern: str | re.Pattern | StringMatchFunction, timeout: float = 10) -> OcrResult:
|
||||
"""
|
||||
|
@ -90,6 +96,7 @@ class ContextOcr:
|
|||
while True:
|
||||
result = self.find(pattern)
|
||||
if result is not None:
|
||||
self.context.device.last_find = result
|
||||
return result
|
||||
if time.time() - start_time > timeout:
|
||||
raise TimeoutError(f"Timeout waiting for {pattern}")
|
||||
|
@ -103,6 +110,7 @@ class ContextOcr:
|
|||
while True:
|
||||
result = self.find(pattern)
|
||||
if result is not None:
|
||||
self.context.device.last_find = result
|
||||
return result
|
||||
if time.time() - start_time > timeout:
|
||||
return None
|
||||
|
@ -116,16 +124,18 @@ class ContextImage:
|
|||
def raw(self):
|
||||
return raw_image
|
||||
|
||||
def wait_for(self, template: str, mask: str | None = None, threshold: float = 0.9, timeout: float = 10) -> bool:
|
||||
def wait_for(self, template: str, mask: str | None = None, threshold: float = 0.9, timeout: float = 10) -> TemplateMatchResult | None:
|
||||
"""
|
||||
等待指定图像出现。
|
||||
"""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if self.find(template, mask, threshold):
|
||||
return True
|
||||
ret = self.find(template, mask, threshold)
|
||||
if ret is not None:
|
||||
self.context.device.last_find = ret
|
||||
return ret
|
||||
if time.time() - start_time > timeout:
|
||||
return False
|
||||
return None
|
||||
time.sleep(0.1)
|
||||
|
||||
def wait_for_any(self, templates: list[str], masks: list[str | None] | None = None, threshold: float = 0.9, timeout: float = 10):
|
||||
|
@ -159,6 +169,7 @@ class ContextImage:
|
|||
while True:
|
||||
ret = self.find(template, mask, threshold)
|
||||
if ret is not None:
|
||||
self.context.device.last_find = ret
|
||||
return ret
|
||||
if time.time() - start_time > timeout:
|
||||
raise TimeoutError(f"Timeout waiting for {template}")
|
||||
|
@ -183,6 +194,7 @@ class ContextImage:
|
|||
for template, mask in zip(templates, _masks):
|
||||
ret = self.find(template, mask, threshold)
|
||||
if ret is not None:
|
||||
self.context.device.last_find = ret
|
||||
return ret
|
||||
if time.time() - start_time > timeout:
|
||||
raise TimeoutError(f"Timeout waiting for any of {templates}")
|
||||
|
@ -195,15 +207,32 @@ class ContextImage:
|
|||
|
||||
与 `find()` 的区别在于,`expect()` 未找到时会抛出异常。
|
||||
"""
|
||||
return expect(self.context.device.screenshot(), template, mask, threshold=threshold)
|
||||
ret = expect(self.context.device.screenshot(), template, mask, threshold=threshold)
|
||||
self.context.device.last_find = ret
|
||||
return ret
|
||||
|
||||
def find(self, template: str, mask: str | None = None, threshold: float = 0.9):
|
||||
def find(self, template: 'str | MatLike', mask: str | None = None, threshold: float = 0.9):
|
||||
"""
|
||||
寻找指定图像。
|
||||
"""
|
||||
return find(self.context.device.screenshot(), template, mask, threshold=threshold)
|
||||
ret = find(self.context.device.screenshot(), template, mask, threshold=threshold)
|
||||
self.context.device.last_find = ret
|
||||
return ret
|
||||
|
||||
def find_crop(
|
||||
def find_any(
|
||||
self,
|
||||
templates: list[str | MatLike],
|
||||
masks: list[str | MatLike | None] | None = None,
|
||||
threshold: float = 0.9
|
||||
) -> TemplateMatchResult | None:
|
||||
"""
|
||||
寻找指定图像中的任意一个。
|
||||
"""
|
||||
ret = find_any(self.context.device.screenshot(), templates, masks, threshold=threshold)
|
||||
self.context.device.last_find = ret
|
||||
return ret
|
||||
|
||||
def find_crop_many(
|
||||
self,
|
||||
template: str,
|
||||
mask: str | None = None,
|
||||
|
@ -244,15 +273,15 @@ class ContextDebug:
|
|||
cv2.waitKey(1)
|
||||
|
||||
|
||||
@cache
|
||||
def _forward_from(getter: Callable[[], T]) -> T:
|
||||
class Forwarded:
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
return getattr(getter(), name)
|
||||
class Forwarded:
|
||||
def __init__(self, getter: Callable[[], T] | None = None, name: str | None = None):
|
||||
self.getter = getter
|
||||
self.name = name
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"Forwarded({object})"
|
||||
return cast(T, Forwarded())
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
if self.getter is None:
|
||||
raise ValueError(f"Forwarded object {self.name} called before initialization.")
|
||||
return getattr(self.getter(), name)
|
||||
|
||||
class Context:
|
||||
def __init__(self):
|
||||
|
@ -260,48 +289,58 @@ class Context:
|
|||
from adbutils import adb
|
||||
adb.connect('127.0.0.1:16384')
|
||||
self.__device = AdbDevice(adb.device_list()[0])
|
||||
# self.__device = None
|
||||
self.__ocr = ContextOcr(self)
|
||||
self.__image = ContextImage(self)
|
||||
self.__vars = ContextGlobalVars()
|
||||
self.__debug = ContextDebug(self)
|
||||
self.actions = []
|
||||
|
||||
def inject_device(self, device: DeviceProtocol):
|
||||
def inject_device(self, device: DeviceABC):
|
||||
self.__device = device
|
||||
|
||||
@property
|
||||
def device(self) -> DeviceProtocol:
|
||||
return cast(DeviceProtocol, _forward_from(lambda: self.__device))
|
||||
def device(self) -> DeviceABC:
|
||||
return self.__device
|
||||
|
||||
@property
|
||||
def ocr(self) -> 'ContextOcr':
|
||||
return cast(ContextOcr, _forward_from(lambda: self.__ocr))
|
||||
return self.__ocr
|
||||
|
||||
@property
|
||||
def image(self) -> 'ContextImage':
|
||||
return cast(ContextImage, _forward_from(lambda: self.__image))
|
||||
return self.__image
|
||||
|
||||
@property
|
||||
def vars(self) -> 'ContextGlobalVars':
|
||||
return cast(ContextGlobalVars, _forward_from(lambda: self.__vars))
|
||||
return self.__vars
|
||||
|
||||
@property
|
||||
def debug(self) -> 'ContextDebug':
|
||||
return cast(ContextDebug, _forward_from(lambda: self.__debug))
|
||||
return self.__debug
|
||||
|
||||
# 暴露 Context 的属性到模块级别
|
||||
_c = Context()
|
||||
device: DeviceProtocol = _c.device
|
||||
_c: Context
|
||||
device: DeviceABC = cast(DeviceABC, Forwarded(name="device"))
|
||||
"""当前正在执行任务的设备。"""
|
||||
ocr: ContextOcr = _c.ocr
|
||||
ocr: ContextOcr = cast(ContextOcr, Forwarded(name="ocr"))
|
||||
"""OCR 引擎。"""
|
||||
image: ContextImage = _c.image
|
||||
image: ContextImage = cast(ContextImage, Forwarded(name="image"))
|
||||
"""图像识别。"""
|
||||
vars: ContextGlobalVars = _c.vars
|
||||
vars: ContextGlobalVars = cast(ContextGlobalVars, Forwarded(name="vars"))
|
||||
"""全局变量。"""
|
||||
debug: ContextDebug = _c.debug
|
||||
debug: ContextDebug = cast(ContextDebug, Forwarded(name="debug"))
|
||||
"""调试工具。"""
|
||||
|
||||
# def __getattr__(name: str) -> Any:
|
||||
# return getattr(_c, name)
|
||||
|
||||
def init_context():
|
||||
global _c, device, ocr, image, vars, debug
|
||||
_c = Context()
|
||||
device.getter = lambda: _c.device # type: ignore
|
||||
ocr.getter = lambda: _c.ocr # type: ignore
|
||||
image.getter = lambda: _c.image # type: ignore
|
||||
vars.getter = lambda: _c.vars # type: ignore
|
||||
debug.getter = lambda: _c.debug # type: ignore
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import os
|
||||
from typing import NamedTuple, Protocol, TypeVar
|
||||
from logging import getLogger
|
||||
|
||||
|
@ -90,7 +91,11 @@ def template_match(
|
|||
:param max_results: 最大结果数,默认为 1。
|
||||
:param remove_duplicate: 是否移除重复结果,默认为 True。
|
||||
"""
|
||||
logger.debug(f'match template: {template} threshold: {threshold} max_results: {max_results}')
|
||||
if isinstance(template, str):
|
||||
_template_name = os.path.relpath(template)
|
||||
else:
|
||||
_template_name = '<opencv Mat>'
|
||||
logger.debug(f'match template: {_template_name} threshold: {threshold} max_results: {max_results}')
|
||||
# 统一参数
|
||||
template = _unify_image(template)
|
||||
image = _unify_image(image)
|
||||
|
@ -152,6 +157,24 @@ def find(
|
|||
matches = template_match(template, image, mask, transparent, threshold, max_results=-1)
|
||||
return matches[0] if len(matches) > 0 else None
|
||||
|
||||
def find_any(
|
||||
image: MatLike,
|
||||
templates: list[MatLike | str],
|
||||
masks: list[MatLike | str | None] | None = None,
|
||||
transparent: bool = False,
|
||||
threshold: float = 0.8,
|
||||
) -> TemplateMatchResult | None:
|
||||
"""指定多个模板,返回第一个匹配到的结果"""
|
||||
if masks is None:
|
||||
_masks = [None] * len(templates)
|
||||
else:
|
||||
_masks = masks
|
||||
for template, mask in zip(templates, _masks):
|
||||
ret = find(image, template, mask, transparent, threshold)
|
||||
if ret is not None:
|
||||
return ret
|
||||
return None
|
||||
|
||||
def count(
|
||||
image: MatLike,
|
||||
template: MatLike | str,
|
||||
|
|
|
@ -3,6 +3,7 @@ import re
|
|||
import typing
|
||||
from typing import NamedTuple, Callable
|
||||
|
||||
import cv2
|
||||
from cv2.typing import MatLike
|
||||
from thefuzz import fuzz as _fuzz
|
||||
|
||||
|
@ -73,3 +74,14 @@ def cropper_y(y1: float, y2: float) -> Callable[[MatLike], MatLike]:
|
|||
|
||||
def cropper_x(x1: float, x2: float) -> Callable[[MatLike], MatLike]:
|
||||
return lambda img: crop_x(img, x1, x2)
|
||||
|
||||
|
||||
def grayscale(img: MatLike | str) -> MatLike:
|
||||
if isinstance(img, str):
|
||||
img = cv2.imread(img)
|
||||
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
@lru_cache
|
||||
def grayscale_cached(img: MatLike | str) -> MatLike:
|
||||
return grayscale(img)
|
||||
|
||||
|
|
|
@ -1 +1 @@
|
|||
from .protocol import DeviceProtocol
|
||||
from .protocol import DeviceABC
|
|
@ -1,24 +1,30 @@
|
|||
import logging
|
||||
from typing import Callable, cast
|
||||
import cv2
|
||||
|
||||
import numpy as np
|
||||
import cv2
|
||||
from cv2.typing import MatLike
|
||||
|
||||
from kotonebot.backend.util import Rect, is_rect
|
||||
from ..protocol import DeviceProtocol, ClickableObjectProtocol
|
||||
|
||||
from adbutils import AdbClient, adb
|
||||
from adbutils._device import AdbDevice as Device
|
||||
|
||||
class AdbDevice(DeviceProtocol):
|
||||
from kotonebot.backend.util import Rect, is_rect
|
||||
from ..protocol import DeviceABC, ClickableObjectProtocol
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class AdbDevice(DeviceABC):
|
||||
def __init__(self, device: Device) -> None:
|
||||
super().__init__()
|
||||
self.device = device
|
||||
self.screenshot_hook: Callable[[MatLike], MatLike] | None = None
|
||||
|
||||
def launch_app(self, package_name: str) -> None:
|
||||
self.device.shell(f"monkey -p {package_name} 1")
|
||||
|
||||
def click(self, arg1, arg2=None) -> None:
|
||||
if is_rect(arg1):
|
||||
def click(self, arg1=None, arg2=None) -> None:
|
||||
if arg1 is None:
|
||||
self.__click_last()
|
||||
elif is_rect(arg1):
|
||||
self.__click_rect(arg1)
|
||||
elif isinstance(arg1, int) and isinstance(arg2, int):
|
||||
self.__click_point(arg1, arg2)
|
||||
|
@ -27,6 +33,11 @@ class AdbDevice(DeviceProtocol):
|
|||
else:
|
||||
raise ValueError(f"Invalid arguments: {arg1}, {arg2}")
|
||||
|
||||
def __click_last(self) -> None:
|
||||
if self.last_find is None:
|
||||
raise ValueError("No last find result")
|
||||
self.click(self.last_find)
|
||||
|
||||
def __click_rect(self, rect: Rect) -> None:
|
||||
# 从矩形中心的 60% 内部随机选择一点
|
||||
x = rect[0] + rect[2] // 2 + np.random.randint(-int(rect[2] * 0.3), int(rect[2] * 0.3))
|
||||
|
@ -45,11 +56,20 @@ class AdbDevice(DeviceProtocol):
|
|||
self.device.shell(f"input swipe {x1} {y1} {x2} {y2} {duration}")
|
||||
|
||||
def screenshot(self) -> MatLike:
|
||||
img = cv2.cvtColor(np.array(self.device.screenshot()), cv2.COLOR_RGB2BGR)
|
||||
if self.screenshot_hook is not None:
|
||||
img = self.screenshot_hook(img)
|
||||
if self.screenshot_hook_before is not None:
|
||||
logger.debug("execute screenshot hook before")
|
||||
img = self.screenshot_hook_before()
|
||||
if img is not None:
|
||||
logger.debug("screenshot hook before returned image")
|
||||
return img
|
||||
img = self.screenshot_raw()
|
||||
if self.screenshot_hook_after is not None:
|
||||
img = self.screenshot_hook_after(img)
|
||||
return img
|
||||
|
||||
def screenshot_raw(self) -> MatLike:
|
||||
return cv2.cvtColor(np.array(self.device.screenshot()), cv2.COLOR_RGB2BGR)
|
||||
|
||||
@property
|
||||
def screen_size(self) -> tuple[int, int]:
|
||||
ret = cast(str, self.device.shell("wm size")).strip('Physical size: ')
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from time import sleep
|
||||
from typing import Callable, Protocol, TYPE_CHECKING, overload, runtime_checkable
|
||||
from abc import ABC
|
||||
|
||||
from cv2.typing import MatLike
|
||||
|
||||
|
@ -22,24 +23,55 @@ class DeviceScreenshotProtocol(Protocol):
|
|||
...
|
||||
|
||||
class HookContextManager:
|
||||
def __init__(self, device: 'DeviceProtocol', func: Callable[[MatLike], MatLike]):
|
||||
def __init__(self, device: 'DeviceABC', func: Callable[[MatLike], MatLike]):
|
||||
self.device = device
|
||||
self.func = func
|
||||
self.old_func = device.screenshot_hook
|
||||
self.old_func = device.screenshot_hook_after
|
||||
|
||||
def __enter__(self):
|
||||
self.device.screenshot_hook = self.func
|
||||
self.device.screenshot_hook_after = self.func
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.device.screenshot_hook = self.old_func
|
||||
self.device.screenshot_hook_after = self.old_func
|
||||
|
||||
class DeviceProtocol(Protocol):
|
||||
"""
|
||||
针对单个设备可执行的操作的协议/接口。
|
||||
"""
|
||||
|
||||
screenshot_hook: Callable[[MatLike], MatLike] | None
|
||||
class PinContextManager:
|
||||
def __init__(self, device: 'DeviceABC'):
|
||||
self.device = device
|
||||
self.old_hook = device.screenshot_hook_before
|
||||
self.memo = None
|
||||
|
||||
def __hook(self) -> MatLike:
|
||||
if self.memo is None:
|
||||
self.memo = self.device.screenshot_raw()
|
||||
return self.memo
|
||||
|
||||
def __enter__(self):
|
||||
self.device.screenshot_hook_before = self.__hook
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.device.screenshot_hook_before = self.old_hook
|
||||
|
||||
def update(self) -> None:
|
||||
"""
|
||||
更新记住的截图
|
||||
"""
|
||||
self.memo = self.device.screenshot_raw()
|
||||
|
||||
|
||||
class DeviceABC(ABC):
|
||||
"""
|
||||
针对单个设备可执行的操作的抽象基类。
|
||||
"""
|
||||
def __init__(self) -> None:
|
||||
self.screenshot_hook_after: Callable[[MatLike], MatLike] | None = None
|
||||
"""截图后调用的函数"""
|
||||
self.screenshot_hook_before: Callable[[], MatLike | None] | None = None
|
||||
"""截图前调用的函数"""
|
||||
self.last_find: Rect | ClickableObjectProtocol | None = None
|
||||
"""上次 image 对象或 ocr 对象的寻找结果"""
|
||||
|
||||
@staticmethod
|
||||
def list_devices() -> list[str]:
|
||||
|
@ -50,7 +82,17 @@ class DeviceProtocol(Protocol):
|
|||
根据包名启动 app
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
@overload
|
||||
def click(self) -> None:
|
||||
"""
|
||||
点击上次 `image` 对象或 `ocr` 对象的寻找结果(仅包括返回单个结果的函数)。
|
||||
(不包括 `image.raw()` 和 `ocr.raw()` 的结果。)
|
||||
|
||||
如果没有上次寻找结果或上次寻找结果为空,会抛出异常 ValueError。
|
||||
"""
|
||||
...
|
||||
|
||||
@overload
|
||||
def click(self, x: int, y: int) -> None:
|
||||
"""
|
||||
|
@ -72,6 +114,9 @@ class DeviceProtocol(Protocol):
|
|||
"""
|
||||
...
|
||||
|
||||
def click(self, *args, **kwargs) -> None:
|
||||
...
|
||||
|
||||
def click_center(self) -> None:
|
||||
"""
|
||||
点击屏幕中心
|
||||
|
@ -128,12 +173,27 @@ class DeviceProtocol(Protocol):
|
|||
"""
|
||||
...
|
||||
|
||||
def screenshot_raw(self) -> MatLike:
|
||||
"""
|
||||
截图,不调用任何 Hook。
|
||||
"""
|
||||
...
|
||||
|
||||
def hook(self, func: Callable[[MatLike], MatLike]) -> HookContextManager:
|
||||
"""
|
||||
注册 Hook,在截图前将会调用此函数,对截图进行处理
|
||||
"""
|
||||
return HookContextManager(self, func)
|
||||
|
||||
def pinned(self) -> PinContextManager:
|
||||
"""
|
||||
记住下次截图结果,并将截图调整为手动挡。
|
||||
之后截图都会返回记住的数据,节省重复截图时间。
|
||||
|
||||
调用返回对象中的 PinContextManager.update() 可以立刻更新记住的截图。
|
||||
"""
|
||||
return PinContextManager(self)
|
||||
|
||||
@property
|
||||
def screen_size(self) -> tuple[int, int]:
|
||||
"""
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import random
|
||||
import re
|
||||
import time
|
||||
from typing import Literal
|
||||
from typing_extensions import deprecated
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
@ -9,14 +10,16 @@ import logging
|
|||
from time import sleep
|
||||
|
||||
from kotonebot import ocr, device, fuzz, contains, image, debug, regex
|
||||
from kotonebot.backend.util import crop_y, cropper_y
|
||||
from kotonebot.backend.context import init_context
|
||||
from kotonebot.backend.util import crop_y, cropper_y, grayscale, grayscale_cached
|
||||
from kotonebot.tasks import R
|
||||
from kotonebot.tasks.actions import loading
|
||||
from kotonebot.tasks.actions.pdorinku import acquire_pdorinku
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def enter_recommended_action(final_week: bool = False) -> bool:
|
||||
ActionType = None | Literal['lesson', 'rest']
|
||||
def enter_recommended_action(final_week: bool = False) -> ActionType:
|
||||
"""
|
||||
在行动选择页面,执行推荐行动
|
||||
|
||||
|
@ -29,25 +32,25 @@ def enter_recommended_action(final_week: bool = False) -> bool:
|
|||
ret = ocr.wait_for(regex('ボーカル|ダンス|ビジュアル|休|体力'))
|
||||
logger.debug("ocr.wait_for: %s", ret)
|
||||
if ret is None:
|
||||
return False
|
||||
return None
|
||||
if not final_week:
|
||||
if "ボーカル" in ret.text:
|
||||
lesson_text = "Vo.レッスン"
|
||||
lesson_text = "Vo"
|
||||
elif "ダンス" in ret.text:
|
||||
lesson_text = "Da.レッスン"
|
||||
lesson_text = "Da"
|
||||
elif "ビジュアル" in ret.text:
|
||||
lesson_text = "Vi.レッスン"
|
||||
lesson_text = "Vi"
|
||||
elif "休" in ret.text or "体力" in ret.text:
|
||||
rest()
|
||||
return True
|
||||
return 'rest'
|
||||
else:
|
||||
return False
|
||||
return None
|
||||
logger.info("Rec. lesson: %s", lesson_text)
|
||||
# 点击课程
|
||||
logger.debug("Try clicking lesson...")
|
||||
lesson_ret = ocr.expect(contains(lesson_text))
|
||||
device.double_click(lesson_ret.rect)
|
||||
return True
|
||||
return 'lesson'
|
||||
else:
|
||||
if "ボーカル" in ret.text:
|
||||
template = R.InPurodyuusu.ButtonFinalPracticeVocal
|
||||
|
@ -56,10 +59,10 @@ def enter_recommended_action(final_week: bool = False) -> bool:
|
|||
elif "ビジュアル" in ret.text:
|
||||
template = R.InPurodyuusu.ButtonFinalPracticeVisual
|
||||
else:
|
||||
return False
|
||||
return None
|
||||
logger.debug("Try clicking lesson...")
|
||||
device.double_click(image.expect_wait(template))
|
||||
return True
|
||||
return 'lesson'
|
||||
|
||||
def before_start_action():
|
||||
"""检测支援卡剧情、领取资源等"""
|
||||
|
@ -86,27 +89,28 @@ def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
|
|||
|
||||
# 固定的卡片坐标 (for 720x1280)
|
||||
CARD_POSITIONS_1 = [
|
||||
(264, 883, 192, 252)
|
||||
# 格式:(x, y, w, h, return_value)
|
||||
(264, 883, 192, 252, 0)
|
||||
]
|
||||
CARD_POSITIONS_2 = [
|
||||
(156, 883, 192, 252),
|
||||
(372, 883, 192, 252),
|
||||
(156, 883, 192, 252, 1),
|
||||
(372, 883, 192, 252, 2),
|
||||
# delta_x = 216, delta_x-width = 24
|
||||
]
|
||||
CARD_POSITIONS_3 = [
|
||||
(47, 883, 192, 252), # 左卡片 (x, y, w, h)
|
||||
(264, 883, 192, 252), # 中卡片
|
||||
(481, 883, 192, 252) # 右卡片
|
||||
(47, 883, 192, 252, 0), # 左卡片 (x, y, w, h)
|
||||
(264, 883, 192, 252, 1), # 中卡片
|
||||
(481, 883, 192, 252, 2) # 右卡片
|
||||
# delta_x = 217, delta_x-width = 25
|
||||
]
|
||||
CARD_POSITIONS_4 = [
|
||||
(17, 883, 192, 252),
|
||||
(182, 883, 192, 252),
|
||||
(346, 883, 192, 252),
|
||||
(511, 883, 192, 252),
|
||||
(17, 883, 192, 252, 0),
|
||||
(182, 883, 192, 252, 1),
|
||||
(346, 883, 192, 252, 2),
|
||||
(511, 883, 192, 252, 3),
|
||||
# delta_x = 165, delta_x-width = -27
|
||||
]
|
||||
SKIP_POSITION = (621, 739, 85, 85)
|
||||
SKIP_POSITION = (621, 739, 85, 85, 10)
|
||||
|
||||
@deprecated('此方法待改进')
|
||||
def calc_pos(card_count: int):
|
||||
|
@ -160,14 +164,24 @@ def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
|
|||
else:
|
||||
raise ValueError(f"Unsupported card count: {card_count}")
|
||||
|
||||
logger.debug("等待截图...")
|
||||
if card_count == 4:
|
||||
# 随机选择一张卡片点击
|
||||
# TODO: 支持对四张卡片进行检测
|
||||
logger.warning("4 cards detected, detecting glowing card in 4 cards is not supported yet.")
|
||||
logger.info("Click random card")
|
||||
card_index = random.randint(0, 3)
|
||||
device.click(CARD_POSITIONS_4[card_index][:4])
|
||||
sleep(1)
|
||||
device.click(CARD_POSITIONS_4[card_index][:4])
|
||||
return card_index
|
||||
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
img = device.screenshot()
|
||||
|
||||
# 检测卡片
|
||||
card_glows = []
|
||||
for x, y, w, h in calc_pos2(card_count) + [SKIP_POSITION]:
|
||||
for x, y, w, h, return_value in calc_pos2(card_count) + [SKIP_POSITION]:
|
||||
# 获取扩展后的卡片区域坐标
|
||||
outer_x = max(0, x - GLOW_EXTENSION)
|
||||
outer_y = max(0, y - GLOW_EXTENSION)
|
||||
|
@ -193,7 +207,7 @@ def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
|
|||
# 计算环形区域的荧光值
|
||||
glow_value = cv2.countNonZero(ring_mask)
|
||||
|
||||
card_glows.append((x, y, w, h, glow_value))
|
||||
card_glows.append((x, y, w, h, glow_value, return_value))
|
||||
|
||||
# 找到荧光值最高的卡片
|
||||
if not card_glows:
|
||||
|
@ -201,7 +215,7 @@ def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
|
|||
continue
|
||||
else:
|
||||
max_glow_card = max(card_glows, key=lambda x: x[4])
|
||||
x, y, w, h, glow_value = max_glow_card
|
||||
x, y, w, h, glow_value, return_value = max_glow_card
|
||||
if glow_value < GLOW_THRESHOLD:
|
||||
logger.debug("Glow value is too low, retrying...")
|
||||
continue
|
||||
|
@ -211,8 +225,19 @@ def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
|
|||
device.click(x + w//2, y + h//2)
|
||||
sleep(random.uniform(0.5, 1.5))
|
||||
device.click(x + w//2, y + h//2)
|
||||
return True
|
||||
return False
|
||||
# 体力溢出提示框
|
||||
ret = image.wait_for(R.InPurodyuusu.ButtonConfirm, timeout=1)
|
||||
if ret is not None:
|
||||
logger.info("Skill card confirmation dialog detected")
|
||||
device.click(ret)
|
||||
if return_value == 10:
|
||||
logger.info("No enough AP. Skip this turn")
|
||||
elif return_value == -1:
|
||||
logger.warning("No glowing card found")
|
||||
else:
|
||||
logger.info("Recommended card is Card %d", return_value + 1)
|
||||
return return_value
|
||||
return -1
|
||||
|
||||
@deprecated('此方法待改进')
|
||||
def skill_card_count1():
|
||||
|
@ -303,55 +328,86 @@ def rest():
|
|||
# 确定
|
||||
device.click(image.expect_wait(R.InPurodyuusu.RestConfirmBtn))
|
||||
|
||||
def acquisitions():
|
||||
"""处理行动结束可能需要处理的事件,直到到行动页面为止"""
|
||||
logger.info("Action end stuffs...")
|
||||
AcquisitionType = Literal[
|
||||
"PDrinkAcquire", # P饮料被动领取
|
||||
"PDrinkSelect", # P饮料主动领取
|
||||
"PDrinkMax", # P饮料到达上限
|
||||
"PSkillCardAcquire", # 技能卡被动领取
|
||||
"PSkillCardSelect", # 技能卡主动领取
|
||||
"PItem", # P物品
|
||||
"Clear", # 目标达成
|
||||
]
|
||||
def acquisitions() -> AcquisitionType | None:
|
||||
"""处理行动开始前和结束后可能需要处理的事件,直到到行动页面为止"""
|
||||
img = device.screenshot_raw()
|
||||
gray_img = grayscale(img)
|
||||
logger.info("Acquisition stuffs...")
|
||||
|
||||
# P饮料被动领取
|
||||
logger.info("Check PDrink acquisition...")
|
||||
if image.find(R.InPurodyuusu.PDrinkIcon):
|
||||
if image.raw().find(img, R.InPurodyuusu.PDrinkIcon):
|
||||
logger.info("Click to finish animation")
|
||||
device.click_center()
|
||||
sleep(1)
|
||||
# P物品
|
||||
# logger.info("Check PItem acquisition...")
|
||||
# if image.wait_for(R.InPurodyuusu.PItemIcon, timeout=1):
|
||||
# logger.info("Click to finish animation")
|
||||
# device.click_center()
|
||||
# sleep(1)
|
||||
return "PDrinkAcquire"
|
||||
# P饮料主动领取
|
||||
# if ocr.raw().find(img, contains("受け取るPドリンクを選れでください")):
|
||||
if image.raw().find(img, R.InPurodyuusu.TextPleaseSelectPDrink):
|
||||
logger.info("PDrink acquisition")
|
||||
# 不领取
|
||||
# device.click(ocr.expect(contains("受け取らない")))
|
||||
# sleep(0.5)
|
||||
# device.click(image.expect(R.InPurodyuusu.ButtonNotAcquire))
|
||||
# sleep(0.5)
|
||||
# device.click(image.expect(R.InPurodyuusu.ButtonConfirm))
|
||||
acquire_pdorinku(index=0)
|
||||
return "PDrinkSelect"
|
||||
# P饮料到达上限
|
||||
if image.raw().find(img, R.InPurodyuusu.TextPDrinkMax):
|
||||
device.click(image.expect(R.InPurodyuusu.ButtonLeave))
|
||||
sleep(0.7)
|
||||
# 可能需要点击确认
|
||||
device.click(image.expect(R.InPurodyuusu.ButtonConfirm, threshold=0.8))
|
||||
return "PDrinkMax"
|
||||
# 技能卡被动领取
|
||||
logger.info("Check skill card acquisition...")
|
||||
if image.wait_for_any([
|
||||
if image.raw().find_any(img, [
|
||||
R.InPurodyuusu.PSkillCardIconBlue,
|
||||
R.InPurodyuusu.PSkillCardIconColorful
|
||||
], timeout=1):
|
||||
]):
|
||||
logger.info("Acquire skill card")
|
||||
device.click_center()
|
||||
return "PSkillCardAcquire"
|
||||
# 技能卡主动领取
|
||||
if ocr.find(contains("受け取るスキルカードを選んでください")):
|
||||
if ocr.raw().find(img, contains("受け取るスキルカードを選んでください")):
|
||||
logger.info("Acquire skill card")
|
||||
acquire_skill_card()
|
||||
# P饮料主动领取
|
||||
if ocr.find(contains("受け取るPドリンクを選れでください")):
|
||||
# 不领取
|
||||
device.click(ocr.expect(contains("受け取らない")))
|
||||
sleep(0.5)
|
||||
device.click(image.expect(R.InPurodyuusu.ButtonNotAcquire))
|
||||
sleep(0.5)
|
||||
device.click(image.expect(R.InPurodyuusu.ButtonConfirm))
|
||||
sleep(5)
|
||||
return "PSkillCardSelect"
|
||||
|
||||
# 检测目标达成
|
||||
if ocr.find(contains("達成")):
|
||||
# 目标达成
|
||||
if image.raw().find(gray_img, grayscale_cached(R.InPurodyuusu.IconClearBlue)):
|
||||
logger.debug("達成: clicked")
|
||||
device.click_center()
|
||||
sleep(2)
|
||||
logger.debug("達成: clicked 2")
|
||||
sleep(5)
|
||||
# TODO: 可能不存在 達成 NEXT
|
||||
logger.debug("達成 NEXT: clicked")
|
||||
device.click_center()
|
||||
return "Clear"
|
||||
# P物品
|
||||
if image.raw().find(img, R.InPurodyuusu.PItemIconColorful):
|
||||
logger.info("Click to finish PItem acquisition")
|
||||
device.click_center()
|
||||
sleep(1)
|
||||
return "PItem"
|
||||
# 支援卡
|
||||
# logger.info("Check support card acquisition...")
|
||||
# 记忆
|
||||
# 未跳过剧情
|
||||
return None
|
||||
|
||||
def until_action_scene():
|
||||
"""等待进入行动场景"""
|
||||
# 检测是否到行动页面
|
||||
while not image.wait_for_any([
|
||||
R.InPurodyuusu.TextPDiary, # 普通周
|
||||
|
@ -365,7 +421,14 @@ def until_action_scene():
|
|||
return
|
||||
|
||||
def until_practice_scene():
|
||||
while not image.wait_for(R.InPurodyuusu.TextClearUntil, timeout=1):
|
||||
"""等待进入练习场景"""
|
||||
while image.wait_for(R.InPurodyuusu.TextClearUntil, timeout=1) is None:
|
||||
acquisitions()
|
||||
sleep(1)
|
||||
|
||||
def until_exam_scene():
|
||||
"""等待进入考试场景"""
|
||||
while ocr.find(regex("合格条件|三位以上")) is None:
|
||||
acquisitions()
|
||||
sleep(1)
|
||||
|
||||
|
@ -376,51 +439,34 @@ def practice():
|
|||
no_card_count = 0
|
||||
MAX_NO_CARD_COUNT = 3
|
||||
while True:
|
||||
count = skill_card_count()
|
||||
if count == 0:
|
||||
logger.info("No skill card found. Wait and retry...")
|
||||
no_card_count += 1
|
||||
if no_card_count >= MAX_NO_CARD_COUNT:
|
||||
break
|
||||
sleep(3)
|
||||
with device.pinned():
|
||||
count = skill_card_count()
|
||||
if count == 0:
|
||||
logger.info("No skill card found. Wait and retry...")
|
||||
# no_card_count += 1
|
||||
# if no_card_count >= MAX_NO_CARD_COUNT:
|
||||
# break
|
||||
if not image.find_any([
|
||||
R.InPurodyuusu.TextPerfectUntil,
|
||||
R.InPurodyuusu.TextClearUntil
|
||||
]):
|
||||
logger.info("PERFECTまで/CLEARまで not found. Practice finished.")
|
||||
break
|
||||
sleep(3)
|
||||
continue
|
||||
if click_recommended_card(card_count=count) == -1:
|
||||
logger.info("Click recommended card failed. Retry...")
|
||||
continue
|
||||
if not click_recommended_card(card_count=count):
|
||||
break
|
||||
logger.info("Wait for next turn...")
|
||||
sleep(9) # TODO: 采用更好的方式检测练习结束
|
||||
# 跳过动画
|
||||
logger.info("Recommend card not found. Practice finished.")
|
||||
ocr.expect_wait(contains("上昇"))
|
||||
logger.info("Click to finish 上昇 ")
|
||||
device.click_center()
|
||||
logger.info("Wait practice finish animation...")
|
||||
# # 领取P饮料
|
||||
# sleep(7) # TODO: 采用更好的方式检测动画结束
|
||||
# if image.wait_for(R.InPurodyuusu.PDrinkIcon, timeout=5):
|
||||
# logger.info("Click to finish animation")
|
||||
# device.click_center()
|
||||
# sleep(1)
|
||||
# # 领取技能卡
|
||||
# ocr.wait_for(contains("受け取るスキルカードを選んでください"))
|
||||
# logger.info("Acquire skill card")
|
||||
# acquire_skill_card()
|
||||
|
||||
# # 等待加载动画
|
||||
# loading.wait_loading_start()
|
||||
# logger.info("Loading...")
|
||||
# loading.wait_loading_end()
|
||||
# logger.info("Loading end")
|
||||
|
||||
# # 检测目标达成
|
||||
# if ocr.wait_for(contains("達成"), timeout=5):
|
||||
# logger.debug("達成: clicked")
|
||||
# device.click_center()
|
||||
# sleep(2)
|
||||
# logger.debug("達成: clicked 2")
|
||||
# device.click_center()
|
||||
|
||||
def exam():
|
||||
"""执行考试"""
|
||||
logger.info("Wait for exam scene...")
|
||||
# TODO: 等待考试开始
|
||||
logger.info("Exam started")
|
||||
# 循环打出推荐卡
|
||||
no_card_count = 0
|
||||
|
@ -434,7 +480,7 @@ def exam():
|
|||
break
|
||||
sleep(3)
|
||||
continue
|
||||
if not click_recommended_card(card_count=count):
|
||||
if click_recommended_card(card_count=count) == -1:
|
||||
break
|
||||
sleep(9) # TODO: 采用更好的方式检测练习结束
|
||||
|
||||
|
@ -442,14 +488,81 @@ def exam():
|
|||
device.click(image.expect_wait(R.InPurodyuusu.NextBtn))
|
||||
while ocr.wait_for(contains("メモリー"), timeout=7):
|
||||
device.click_center()
|
||||
# 领取技能卡
|
||||
acquire_skill_card()
|
||||
|
||||
def produce_end():
|
||||
"""执行考试结束"""
|
||||
# 考试结束对话
|
||||
image.expect_wait(R.InPurodyuusu.TextAsariProduceEnd, timeout=30)
|
||||
bottom = (int(device.screen_size[0] / 2), int(device.screen_size[1] * 0.9))
|
||||
device.click(*bottom)
|
||||
sleep(3)
|
||||
device.click_center()
|
||||
sleep(3)
|
||||
device.click(*bottom)
|
||||
sleep(3)
|
||||
device.click(*bottom)
|
||||
sleep(3)
|
||||
|
||||
# MV
|
||||
# 等就可以了,反正又不要自己操作(
|
||||
|
||||
# 结算
|
||||
# 最終プロデュース評価
|
||||
image.expect_wait(R.InPurodyuusu.TextFinalProduceRating, timeout=60 * 2.5)
|
||||
device.click_center()
|
||||
sleep(3)
|
||||
# 次へ
|
||||
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
|
||||
sleep(1)
|
||||
# 決定
|
||||
device.click(image.expect_wait(R.InPurodyuusu.ButtonConfirm, threshold=0.8))
|
||||
sleep(1)
|
||||
# 上传图片。注意网络可能会很慢,可能出现上传失败对话框
|
||||
retry_count = 0
|
||||
MAX_RETRY_COUNT = 5
|
||||
while True:
|
||||
# 处理上传失败
|
||||
if image.find(R.InPurodyuusu.ButtonRetry):
|
||||
logger.info("Upload failed. Retry...")
|
||||
retry_count += 1
|
||||
if retry_count >= MAX_RETRY_COUNT:
|
||||
logger.info("Upload failed. Max retry count reached.")
|
||||
logger.info("Cancel upload.")
|
||||
device.click(image.expect_wait(R.InPurodyuusu.ButtonCancel))
|
||||
sleep(2)
|
||||
continue
|
||||
device.click()
|
||||
# 记忆封面保存失败提示
|
||||
elif image.find(R.InPurodyuusu.ButtonClose):
|
||||
logger.info("Memory cover save failed. Click to close.")
|
||||
device.click()
|
||||
# 结算完毕
|
||||
elif image.find(R.InPurodyuusu.ButtonNextNoIcon):
|
||||
logger.info("Finalize")
|
||||
device.click()
|
||||
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
|
||||
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
|
||||
device.click(image.expect_wait(R.InPurodyuusu.ButtonComplete))
|
||||
# 关注提示
|
||||
# if image.wait_for(R.InPurodyuusu.ButtonFollowProducer, timeout=2):
|
||||
# device.click(image.expect_wait(R.InPurodyuusu.ButtonCancel))
|
||||
break
|
||||
# 开始生成记忆
|
||||
# elif image.find(R.InPurodyuusu.ButtonGenerateMemory):
|
||||
# logger.info("Click generate memory button")
|
||||
# device.click()
|
||||
# 跳过结算内容
|
||||
else:
|
||||
device.click_center()
|
||||
sleep(2)
|
||||
|
||||
|
||||
def hajime_regular(week: int = -1, start_from: int = 0):
|
||||
def hajime_regular(week: int = -1, start_from: int = 1):
|
||||
"""
|
||||
「初」 Regular 模式
|
||||
|
||||
:param week: 第几周,从1开始,-1表示全部
|
||||
:param start_from: 从第几周开始,从1开始。
|
||||
"""
|
||||
def week1():
|
||||
"""
|
||||
|
@ -470,7 +583,6 @@ def hajime_regular(week: int = -1, start_from: int = 0):
|
|||
第二周 期中考试剩余4周\n
|
||||
行动:授業(学习)
|
||||
"""
|
||||
logger.info("Regular week 2 started.")
|
||||
# 点击“授業”
|
||||
rect = image.expect_wait(R.InPurodyuusu.Action.ActionStudy).rect
|
||||
device.click(rect)
|
||||
|
@ -523,7 +635,6 @@ def hajime_regular(week: int = -1, start_from: int = 0):
|
|||
第三周 期中考试剩余3周\n
|
||||
行动:Vo.レッスン、Da.レッスン、Vi.レッスン、授業
|
||||
"""
|
||||
logger.info("Regular week 3 started.")
|
||||
week1()
|
||||
|
||||
def week4():
|
||||
|
@ -531,7 +642,6 @@ def hajime_regular(week: int = -1, start_from: int = 0):
|
|||
第四周 期中考试剩余2周\n
|
||||
行动:おでかけ、相談、活動支給
|
||||
"""
|
||||
logger.info("Regular week 4 started.")
|
||||
week3()
|
||||
|
||||
def week5():
|
||||
|
@ -539,11 +649,9 @@ def hajime_regular(week: int = -1, start_from: int = 0):
|
|||
|
||||
def week6():
|
||||
"""期中考试"""
|
||||
logger.info("Regular week 6 started.")
|
||||
|
||||
def week7():
|
||||
"""第七周 期末考试剩余6周"""
|
||||
logger.info("Regular week 7 started.")
|
||||
if not enter_recommended_action():
|
||||
rest()
|
||||
|
||||
|
@ -552,47 +660,75 @@ def hajime_regular(week: int = -1, start_from: int = 0):
|
|||
第八周 期末考试剩余5周\n
|
||||
行动:授業、活動支給
|
||||
"""
|
||||
logger.info("Regular week 8 started.")
|
||||
if not enter_recommended_action():
|
||||
rest()
|
||||
|
||||
def week_common():
|
||||
if not enter_recommended_action():
|
||||
rest()
|
||||
else:
|
||||
until_action_scene()
|
||||
executed_action = enter_recommended_action()
|
||||
logger.info("Executed recommended action: %s", executed_action)
|
||||
if executed_action == 'lesson':
|
||||
sleep(5)
|
||||
until_practice_scene()
|
||||
practice()
|
||||
elif executed_action == 'rest':
|
||||
pass
|
||||
elif executed_action is None:
|
||||
rest()
|
||||
until_action_scene()
|
||||
|
||||
|
||||
def week_final():
|
||||
if not enter_recommended_action(final_week=True):
|
||||
if enter_recommended_action(final_week=True) != 'lesson':
|
||||
raise ValueError("Failed to enter recommended action on final week.")
|
||||
sleep(5)
|
||||
until_practice_scene()
|
||||
practice()
|
||||
# until_exam_scene()
|
||||
|
||||
|
||||
def week_mid_exam():
|
||||
logger.info("Week mid exam started.")
|
||||
logger.info("Wait for exam scene...")
|
||||
until_exam_scene()
|
||||
logger.info("Exam scene detected.")
|
||||
sleep(5)
|
||||
device.click_center()
|
||||
sleep(5)
|
||||
exam()
|
||||
until_action_scene()
|
||||
|
||||
def week_final_exam():
|
||||
logger.info("Week final exam started.")
|
||||
logger.info("Wait for exam scene...")
|
||||
until_exam_scene()
|
||||
logger.info("Exam scene detected.")
|
||||
sleep(5)
|
||||
device.click_center()
|
||||
sleep(5)
|
||||
exam()
|
||||
produce_end()
|
||||
|
||||
weeks = [
|
||||
week_common, # 1
|
||||
week_common, # 2
|
||||
week_common, # 3
|
||||
week_common, # 4
|
||||
week_final, # 5
|
||||
exam, # 6
|
||||
week_mid_exam, # 6
|
||||
week_common, # 7
|
||||
week_common, # 8
|
||||
week_common, # 9
|
||||
week_common, # 10
|
||||
week_common, # 11
|
||||
week_final, # 12
|
||||
exam, # 13
|
||||
week_final_exam, # 13
|
||||
]
|
||||
if week != -1:
|
||||
logger.info("Week %d started.", week)
|
||||
weeks[week - 1]()
|
||||
else:
|
||||
for w in weeks[start_from-1:]:
|
||||
for i, w in enumerate(weeks[start_from-1:]):
|
||||
logger.info("Week %d started.", i + start_from)
|
||||
w()
|
||||
|
||||
def purodyuusu(
|
||||
|
@ -613,15 +749,18 @@ if __name__ == '__main__':
|
|||
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s')
|
||||
getLogger('kotonebot').setLevel(logging.DEBUG)
|
||||
getLogger(__name__).setLevel(logging.DEBUG)
|
||||
init_context()
|
||||
|
||||
# produce_end()
|
||||
# exam()
|
||||
# enter_recommended_action()
|
||||
# remaing_turns_and_points()
|
||||
practice()
|
||||
# action_end()
|
||||
until_action_scene()
|
||||
# acquisitions()
|
||||
# acquire_pdorinku(0)
|
||||
# image.wait_for(R.InPurodyuusu.InPractice.PDorinkuIcon)
|
||||
# hajime_regular(start_from=5)
|
||||
# hajime_regular(start_from=1)
|
||||
# until_practice_scene()
|
||||
# device.click(image.expect_wait_any([
|
||||
# R.InPurodyuusu.PSkillCardIconBlue,
|
||||
|
|
|
@ -2,6 +2,7 @@ from time import sleep
|
|||
from logging import getLogger
|
||||
|
||||
from kotonebot import device, ocr, image, Rect
|
||||
from kotonebot.backend.util import contains
|
||||
from .. import R
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
@ -23,7 +24,7 @@ def list_pdorinku() -> list[tuple[str, Rect]]:
|
|||
"""
|
||||
# 截图所有饮料
|
||||
# TODO: 自动记录未知饮料
|
||||
dorinkus = image.find_crop(
|
||||
dorinkus = image.find_crop_many(
|
||||
R.InPurodyuusu.Action.PDorinkuBg,
|
||||
mask=R.InPurodyuusu.Action.PDorinkuBgMask,
|
||||
)
|
||||
|
@ -38,20 +39,28 @@ def acquire_pdorinku(index: int):
|
|||
:param index: 要领取的 P 饮料的索引。从 0 开始。
|
||||
"""
|
||||
# TODO: 随机领取一个饮料改成根据具体情况确定最佳
|
||||
# 点击饮料
|
||||
drinks = list_pdorinku()
|
||||
dorinku = drinks[index]
|
||||
device.click(dorinku[1])
|
||||
logger.debug(f"Pドリンク clicked: {dorinku[0]}")
|
||||
sleep(0.3)
|
||||
# 确定按钮
|
||||
ret = ocr.expect('受け取る')
|
||||
device.click(ret.rect)
|
||||
logger.debug("受け取る clicked")
|
||||
sleep(1.3)
|
||||
# 再次确定
|
||||
device.click_center()
|
||||
logger.debug("再次确定 clicked")
|
||||
# 如果能不领取,就不领取
|
||||
if ocr.find(contains('受け取らない')):
|
||||
device.click()
|
||||
sleep(0.5)
|
||||
device.click(image.expect(R.InPurodyuusu.ButtonNotAcquire))
|
||||
sleep(0.5)
|
||||
device.click(image.expect(R.InPurodyuusu.ButtonConfirm))
|
||||
else:
|
||||
# 点击饮料
|
||||
drinks = list_pdorinku()
|
||||
dorinku = drinks[index]
|
||||
device.click(dorinku[1])
|
||||
logger.debug(f"Pドリンク clicked: {dorinku[0]}")
|
||||
sleep(0.3)
|
||||
# 确定按钮
|
||||
ret = ocr.expect('受け取る')
|
||||
device.click(ret.rect)
|
||||
logger.debug("受け取る clicked")
|
||||
sleep(1.3)
|
||||
# 再次确定
|
||||
device.click_center()
|
||||
logger.debug("再次确定 clicked")
|
||||
|
||||
|
||||
__actions__ = [acquire_pdorinku]
|
||||
|
|
After Width: | Height: | Size: 4.6 KiB |
After Width: | Height: | Size: 4.0 KiB |
After Width: | Height: | Size: 4.2 KiB |
After Width: | Height: | Size: 2.2 KiB |
After Width: | Height: | Size: 3.9 KiB |
After Width: | Height: | Size: 2.1 KiB |
After Width: | Height: | Size: 3.6 KiB |
After Width: | Height: | Size: 11 KiB |
After Width: | Height: | Size: 1.8 KiB |
After Width: | Height: | Size: 12 KiB |
After Width: | Height: | Size: 1.7 KiB |
After Width: | Height: | Size: 11 KiB |
After Width: | Height: | Size: 5.5 KiB |
After Width: | Height: | Size: 2.6 KiB |
After Width: | Height: | Size: 10 KiB |
After Width: | Height: | Size: 2.1 KiB |
After Width: | Height: | Size: 979 KiB |
After Width: | Height: | Size: 959 KiB |
|
@ -1,7 +1,7 @@
|
|||
import unittest
|
||||
|
||||
from kotonebot import _c
|
||||
from kotonebot.tasks.actions.in_purodyuusu import skill_card_count
|
||||
from kotonebot.tasks.actions.in_purodyuusu import skill_card_count, click_recommended_card
|
||||
from util import MockDevice
|
||||
|
||||
|
||||
|
@ -12,6 +12,13 @@ class TestActionInProduce(unittest.TestCase):
|
|||
cls.d = MockDevice()
|
||||
_c.inject_device(cls.d)
|
||||
|
||||
def test_click_recommended_card(self):
|
||||
# 文件命名格式:卡片数量_预期返回值_编号.png
|
||||
self.d.screenshot_path = 'tests/images/produce/recommended_card_3_-1_0.png'
|
||||
self.assertEqual(click_recommended_card(timeout=1), -1)
|
||||
self.d.screenshot_path = 'tests/images/produce/recommended_card_4_3_0.png'
|
||||
self.assertEqual(click_recommended_card(timeout=1), 0)
|
||||
|
||||
def test_current_skill_card_count(self):
|
||||
cards_1 = 'tests/images/produce/in_produce_cards_1.png'
|
||||
cards_2 = 'tests/images/produce/in_produce_cards_2.png'
|
||||
|
|
|
@ -5,21 +5,21 @@ import cv2
|
|||
from cv2.typing import MatLike
|
||||
|
||||
from kotonebot.backend.util import Rect
|
||||
from kotonebot.client.protocol import DeviceProtocol
|
||||
from kotonebot.client.protocol import DeviceABC
|
||||
|
||||
class MockDevice(DeviceProtocol):
|
||||
class MockDevice(DeviceABC):
|
||||
def __init__(
|
||||
self,
|
||||
screenshot_path: str = '',
|
||||
):
|
||||
self.screenshot_path = screenshot_path
|
||||
self.screenshot_hook = None
|
||||
self.screenshot_hook_after = None
|
||||
|
||||
@override
|
||||
def screenshot(self) -> MatLike:
|
||||
img = cv2.imread(self.screenshot_path)
|
||||
if self.screenshot_hook is not None:
|
||||
img = self.screenshot_hook(img)
|
||||
if self.screenshot_hook_after is not None:
|
||||
img = self.screenshot_hook_after(img)
|
||||
return img
|
||||
|
||||
@staticmethod
|
||||
|
|