initial commit

This commit is contained in:
XcantloadX 2025-01-06 09:42:03 +08:00
commit 0c7574124e
101 changed files with 3561 additions and 0 deletions

177
.gitignore vendored Normal file
View File

@ -0,0 +1,177 @@
##########################
tests/output_images
tests/output_images/*
R.py
##########################
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# PyPI configuration file
.pypirc

6
.vscode/extensions.json vendored Normal file
View File

@ -0,0 +1,6 @@
{
"recommendations": [
"rioj7.command-variable",
"wholroyd.jinja"
]
}

24
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,24 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Python: Current Module",
"type": "python",
"request": "launch",
"console": "integratedTerminal",
"module": "${command:extension.commandvariable.file.relativeDirDots}.${fileBasenameNoExtension}",
}
]
}

13
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,13 @@
{
"python.analysis.autoImportCompletions": true,
"python.analysis.typeCheckingMode": "basic",
"python.testing.unittestArgs": [
"-v",
"-s",
"./tests",
"-p",
"test*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
}

12
.vscode/tasks.json vendored Normal file
View File

@ -0,0 +1,12 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"label": "Clear test results",
"type": "shell",
"command": "powershell \"rm -r tests/output_images\""
}
]
}

11
README.md Normal file
View File

@ -0,0 +1,11 @@
## 开发
```bash
git clone https://github.com/XcantloadX/KotonesAutoAssistant.git
cd KotonesAutoAssistant
python -m venv venv
source venv/bin/activate # Windows 系统: venv\Scripts\activate
pip install -r requirements.txt
pip install -r requirements.dev.txt
python tools/make_resources.py
```
然后打开 VSCode 设置搜索“SupportRestructured Text”并勾选。

BIN
a.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 935 B

125
detect_card.py Normal file
View File

@ -0,0 +1,125 @@
import cv2
import numpy as np
def detect_glowing_card(image_path):
# 读取图像
img = cv2.imread(image_path)
# 转换到HSV色彩空间
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# 1. 首先检测卡片轮廓
# 转换成灰度图
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 使用高斯模糊减少噪声
blurred = cv2.GaussianBlur(gray, (7, 7), 0)
# 使用Canny边缘检测
edges = cv2.Canny(blurred, 50, 150)
# 膨胀边缘使轮廓更明显
dilated = cv2.dilate(edges, None, iterations=2)
cv2.imshow('Dilated Edges', dilated)
cv2.waitKey(0)
# 查找轮廓
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 筛选可能的卡片轮廓
card_contours = []
min_card_area = 5000 # 最小卡片面积
max_card_area = 50000 # 最大卡片面积
for contour in contours:
area = cv2.contourArea(contour)
if min_card_area < area < max_card_area:
# 计算轮廓的主要特征
peri = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * peri, True)
# 计算最小外接矩形
rect = cv2.minAreaRect(contour)
box = cv2.boxPoints(rect)
box = np.int0(box)
# 计算轮廓的形状特征
aspect_ratio = rect[1][0] / rect[1][1] if rect[1][1] != 0 else 0
# 检查是否符合卡片特征
if 0.5 < aspect_ratio < 2.0: # 合理的宽高比
card_contours.append(contour)
# 2. 创建卡片掩码
card_mask = np.zeros_like(gray)
cv2.drawContours(card_mask, card_contours, -1, (255), -1)
# 3. 检测黄色发光
# 定义黄色的HSV范围
lower_yellow = np.array([20, 100, 100])
upper_yellow = np.array([30, 255, 255])
# 创建黄色掩码
yellow_mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 4. 结合卡片掩码和黄色掩码
final_mask = cv2.bitwise_and(yellow_mask, card_mask)
# 5. 分析每张卡片
glow_scores = []
card_regions = []
for contour in card_contours:
# 获取卡片边界框
x, y, w, h = cv2.boundingRect(contour)
card_regions.append((x, y, x+w, y+h))
# 计算该区域内的发光得分
region_mask = final_mask[y:y+h, x:x+w]
score = np.sum(region_mask > 0)
glow_scores.append(score)
# 6. 找出发光卡片
if glow_scores:
glowing_card_index = np.argmax(glow_scores)
# 在原图上标记结果
result = img.copy()
for i, (x1, y1, x2, y2) in enumerate(card_regions):
color = (0, 255, 0) if i == glowing_card_index else (0, 0, 255)
cv2.rectangle(result, (x1, y1), (x2, y2), color, 2)
return {
'glowing_card_index': glowing_card_index,
'glow_scores': glow_scores,
'result_image': result,
'card_mask': card_mask,
'yellow_mask': yellow_mask,
'final_mask': final_mask
}
else:
return None
def display_results(results):
if results is None:
print("未检测到卡片")
return
# 显示所有处理步骤的结果
cv2.imshow('Original with Detection', results['result_image'])
cv2.imshow('Card Mask', results['card_mask'])
cv2.imshow('Yellow Mask', results['yellow_mask'])
cv2.imshow('Final Mask', results['final_mask'])
print(f"发光卡片序号: {results['glowing_card_index']}")
print(f"各卡片发光得分: {results['glow_scores']}")
cv2.waitKey(0)
cv2.destroyAllWindows()
def main():
image_path = r"C:\Users\user\Downloads\Snipaste_2024-12-26_10-11-58.png" # 替换为实际图像路径
results = detect_glowing_card(image_path)
display_results(results)
if __name__ == '__main__':
main()

216
detect_card_simple.py Normal file
View File

@ -0,0 +1,216 @@
import cv2
import numpy as np
# 使用其他 API 获取屏幕尺寸
import ctypes
user32 = ctypes.windll.user32
user32.SetProcessDPIAware()
screen = user32.GetSystemMetrics(0), user32.GetSystemMetrics(1)
ignore_show = False
def show(title, image):
img_h, img_w = image.shape[:2]
scale_x = screen[0] * 0.9 / img_w
scale_y = screen[1] * 0.9 / img_h
if scale_x < 1 or scale_y < 1:
scale = min(scale_x, scale_y)
resized = cv2.resize(image, (0, 0), fx=scale, fy=scale)
else:
resized = image
if not ignore_show:
cv2.imshow(title, resized)
cv2.waitKey(0)
cv2.destroyWindow(title)
# 读取图像
# image = cv2.imread(r"C:\Users\user\Downloads\1735195113729.jpg")
# image = cv2.imread(r"./test_images/1.jpg")
image = cv2.imread(r"./test_images/test1.png")
original = image.copy()
# 转换为灰度图像,并模糊以减少噪声
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# _, binary = cv2.threshold(original, 150, 255, cv2.THRESH_BINARY)
_, binary = cv2.threshold(gray, 190, 255, cv2.THRESH_BINARY)
kernel = np.ones((2, 2),np.uint8) # 定义卷积核大小,可以根据实际情况调整
erosion = cv2.erode(binary, kernel,iterations = 7)
dilation = cv2.dilate(binary, kernel,iterations = 10) # 膨胀操作
# closing = cv2.morphologyEx(dilation, cv2.MORPH_CLOSE, kernel) # 闭运算
# 边缘检测
edges = cv2.Canny(binary, 150, 300)
show("Binary", binary)
# show("erosion", erosion)
# show("dilation", dilation)
show("Edges", edges)
cv2.waitKey(0)
# 找到所有轮廓
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 封闭轮廓
# closed_contours = []
# for contour in contours:
# epsilon = 0.01 * cv2.arcLength(contour, True)
# approx = cv2.approxPolyDP(contour, epsilon, True)
# closed_contours.append(approx)
# 按轮廓长度排序只保留前10个
# contours = sorted(contours, key=lambda c: cv2.arcLength(c, False), reverse=True)[:10]
# # # 只保留直线轮廓
# filtered_contours = []
# for contour in contours:
# # 使用最小二乘法拟合直线
# [vx, vy, x, y] = cv2.fitLine(contour, cv2.DIST_L2, 0, 0.01, 0.01)
# # 计算轮廓的角度
# angle = np.arctan2(vy, vx) * 180 / np.pi
# # 如果角度接近水平或垂直,则保留该轮廓
# if abs(angle) < 30 or abs(angle - 90) < 30:
# filtered_contours.append(contour)
# contours = filtered_contours
# # 移除长度过短的轮廓
# min_contour_length = 150 # 设置最小轮廓长度阈值
# filtered_contours = [contour for contour in contours if cv2.arcLength(contour, True) > min_contour_length]
# contours = filtered_contours
# 绘制所有封闭轮廓
cv2.drawContours(original, contours, -1, (0, 255, 0), 2)
show("Contours", original)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 创建一个空白图像用于绘制轮廓
contour_image = np.zeros_like(image)
# 绘制所有轮廓
cv2.drawContours(contour_image, contours, -1, (255, 255, 255), 1)
# 显示只有轮廓的图像
show("Contours Only", contour_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 用于存储每张卡片的荧光值
card_glow_values = []
card_contours = []
# 遍历轮廓,筛选出卡片区域
for contour in contours:
area = cv2.contourArea(contour)
if area > 800: # 忽略小区域
# 近似轮廓形状
peri = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * peri, True)
# 计算外接矩形
x, y, w, h = cv2.boundingRect(approx)
card_region = image[y:y+h, x:x+w]
# 绘制并展示该区域
# cv2.rectangle(original, (x, y), (x + w, y + h), (0, 0, 255), 2)
# show("Card Region pre", card_region)
# cv2.waitKey(0)
# 一起展示轮廓
# Create a tight black background for the contour
x, y, w, h = cv2.boundingRect(approx)
padding = 50
contour_img = np.zeros((h + padding*2, w + padding*2, 3), dtype=np.uint8)
# Adjust contour coordinates for padding
shifted_contour = approx.copy()
shifted_contour[:,:,0] = approx[:,:,0] - x + padding
shifted_contour[:,:,1] = approx[:,:,1] - y + padding
# Draw contour on black background
# cv2.drawContours(contour_img, [shifted_contour], -1, (0, 255, 0), 2)
# show("Contour on Black", contour_img)
# cv2.waitKey(0)
# 条件 1满足长宽比
aspect_ratio = w / float(h)
TARGET_ASPECT_RATIO_RANGE = (0.73, 0.80)
if not (TARGET_ASPECT_RATIO_RANGE[0] < aspect_ratio < TARGET_ASPECT_RATIO_RANGE[1]):
continue
# 条件 3颜色要求
# 提取区域右下角与左下角(40, 40)正方形区域的平均颜色
bottom_right = card_region[-40:, -40:]
bottom_left = card_region[-40:, :40]
avg_color_br = np.mean(bottom_right, axis=(0, 1))
avg_color_bl = np.mean(bottom_left, axis=(0, 1))
# 检查是否都近似 #f0f0f0
TARGET_COLOR = (240, 240, 240)
# 绘制并展示该区域
cv2.rectangle(original, (x, y), (x + w, y + h), (0, 0, 255), 2)
# 把颜色画上去
# 绘制并展示该区域
cv2.rectangle(original, (x, y), (x + w, y + h), (0, 0, 255), 2)
show("Card Region pre", card_region)
# preview = cv2.putText(original.copy(), str(avg_color_br), (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
# show("Card Region", preview)
# cv2.waitKey(0)
if not (
np.allclose(avg_color_br, TARGET_COLOR, atol=5)
and np.allclose(avg_color_bl, TARGET_COLOR, atol=5)
):
continue
if TARGET_ASPECT_RATIO_RANGE[0] < aspect_ratio < TARGET_ASPECT_RATIO_RANGE[1]:
# 提取卡片区域
card_region = image[y:y+h, x:x+w]
# 转换为 HSV 色彩空间
hsv = cv2.cvtColor(card_region, cv2.COLOR_BGR2HSV)
# 定义黄色的 HSV 阈值
lower_yellow = np.array([20, 100, 100])
upper_yellow = np.array([30, 255, 255])
# 创建遮罩,提取黄色区域
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 计算荧光值(黄色像素的总数)
glow_value = cv2.countNonZero(mask)
# 保存卡片轮廓和对应的荧光值
card_glow_values.append(glow_value)
card_contours.append((x, y, w, h))
# 绘制筛选后的边缘
for contour in card_contours:
x, y, w, h = contour
cv2.rectangle(original, (x, y), (x + w, y + h), (255, 0, 0), 5)
# 找到荧光值最高的卡片
if card_glow_values:
max_glow_index = np.argmax(card_glow_values)
max_glow_card = card_contours[max_glow_index]
# 绘制荧光值最高的卡片轮廓
x, y, w, h = max_glow_card
cv2.rectangle(original, (x, y), (x + w, y + h), (0, 255, 0), 5)
cv2.putText(original, "Max Glow", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# 显示结果
show("Detected Cards", original)
cv2.waitKey(0)
cv2.destroyAllWindows()

67
detect_yellow.py Normal file
View File

@ -0,0 +1,67 @@
import cv2
import numpy as np
# 读取图像
# image = cv2.imread(r"C:\Users\user\Downloads\Snipaste_2024-12-26_10-11-58.png")
# image = cv2.imread(r"C:\Users\user\Downloads\1735194517471.jpg")
image = cv2.imread(r"C:\Users\user\Downloads\1735195113729.jpg")
original = image.copy()
# 转换为 HSV 色彩空间
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 定义黄色的 HSV 阈值
lower_yellow = np.array([20, 100, 100])
upper_yellow = np.array([60, 255, 255])
# 使用遮罩作为二值图像
binary = cv2.threshold(original, 220, 255, cv2.THRESH_BINARY)[1]
# 创建遮罩,提取黄色区域
mask = cv2.inRange(cv2.cvtColor(binary, cv2.COLOR_BGR2HSV), lower_yellow, upper_yellow)
# 获取图像高度和宽度
height, width = image.shape[:2]
# 计算每个区域的宽度
region_width = width // 3
# 创建三个区域的遮罩
regions = []
yellow_pixels = []
for i in range(3):
start_x = i * region_width
end_x = (i + 1) * region_width
region_mask = mask[:, start_x:end_x]
regions.append(region_mask)
yellow_pixels.append(cv2.countNonZero(region_mask))
# 找出黄色像素最多的区域
max_yellow_region = yellow_pixels.index(max(yellow_pixels))
# 截取对应区域的原图
start_x = max_yellow_region * region_width
end_x = (max_yellow_region + 1) * region_width
cropped_image = original[:, start_x:end_x]
# 显示含黄色最多的区域
cv2.imshow("Region with most yellow", cropped_image)
# 打印每个区域的黄色像素数量
for i, count in enumerate(yellow_pixels):
print(f"Region {i + 1}: {count} yellow pixels")
# 判断是否存在突出区域
max_count = max(yellow_pixels)
avg_count = sum(yellow_pixels) / len(yellow_pixels)
if max_count > avg_count * 1.5: # 如果最大值超过平均值的1.5倍
print(f"Region {yellow_pixels.index(max_count) + 1} has significantly more yellow pixels")
else:
print("No region has significantly more yellow pixels")
# 显示结果
cv2.imshow("Yellow Mask", mask)
cv2.imshow("Binary", binary)
cv2.waitKey(0)

BIN
dorinku_0.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.4 KiB

BIN
dorinku_1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.7 KiB

BIN
dorinku_2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.2 KiB

29
fuzz.py Normal file
View File

@ -0,0 +1,29 @@
from thefuzz import fuzz
import time
import random
import string
def generate_random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
# 生成测试数据
count = 100000
test_strings = [(generate_random_string(10), generate_random_string(10)) for _ in range(count)]
# 测试普通字符串比较
start_time = time.time()
for s1, s2 in test_strings:
a = (s1 == s2)
str_compare_time = time.time() - start_time
# 测试 fuzz.ratio
start_time = time.time()
for s1, s2 in test_strings:
fuzz.ratio(s1, s2)
fuzz_time = time.time() - start_time
print(f"字符串比较耗时: {str_compare_time:.4f}")
print(f"fuzz.ratio耗时: {fuzz_time:.4f}")
print(f"fuzz.ratio比字符串比较慢 {fuzz_time/str_compare_time:.1f}")
print(fuzz.ratio("Da.レッスン", "Daレッスン"))

20
imgui_test.py Normal file
View File

@ -0,0 +1,20 @@
import cv2
from kotonebot.client.device.fast_screenshot import AdbFastScreenshots
with AdbFastScreenshots(
adb_path=r"D:\SDK\Android\platform-tools\adb.exe",
device_serial="127.0.0.1:16384",
time_interval=179,
width=720,
height=1280,
bitrate="5M",
use_busybox=False,
connect_to_device=True,
screenshotbuffer=10,
go_idle=0,
) as adbscreen:
for image in adbscreen:
cv2.imshow("CV2 WINDOW", image)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cv2.destroyAllWindows()

15
kotonebot/__init__.py Normal file
View File

@ -0,0 +1,15 @@
from kotonebot.client.protocol import DeviceProtocol
from .backend.context import ContextOcr, ContextImage, ContextDebug, _c
from .backend.util import Rect, fuzz, regex, contains
device: DeviceProtocol
ocr: ContextOcr
image: ContextImage
debug: ContextDebug
def __getattr__(name: str):
try:
return getattr(_c, name)
except AttributeError:
return globals()[name]

View File

@ -0,0 +1,307 @@
import os
import re
import time
from functools import cache
from datetime import datetime
from typing import Callable, TYPE_CHECKING, cast, overload, Any, TypeVar, Literal
from kotonebot.client.device.adb import AdbDevice
import cv2
if TYPE_CHECKING:
from cv2.typing import MatLike
import kotonebot.backend.image as raw_image
from kotonebot.backend.image import CropResult, TemplateMatchResult, find_crop, expect, find
from kotonebot.backend.util import Rect
from kotonebot.client import DeviceProtocol
from kotonebot.backend.ocr import Ocr, OcrResult, jp, en, StringMatchFunction
OcrLanguage = Literal['jp', 'en']
T = TypeVar('T')
class ContextOcr:
def __init__(self, context: 'Context'):
self.context = context
self.__engine = jp
def raw(self, lang: OcrLanguage) -> Ocr:
"""
返回 `kotonebot.backend.ocr` 中的 Ocr 对象\n
Ocr 对象与此对象的区别是此对象会自动截图 Ocr 对象需要手动传入图像参数
"""
match lang:
case 'jp':
return jp
case 'en':
return en
case _:
raise ValueError(f"Invalid language: {lang}")
@overload
def ocr(self) -> list[OcrResult]:
"""OCR 当前设备画面。"""
...
@overload
def ocr(self, img: 'MatLike') -> list[OcrResult]:
"""OCR 指定图像。"""
...
def ocr(self, img: 'MatLike | None' = None) -> list[OcrResult]:
"""OCR 当前设备画面或指定图像。"""
if img is None:
return self.__engine.ocr(self.context.device.screenshot())
return self.__engine.ocr(img)
@overload
def find(self, pattern: str | re.Pattern | StringMatchFunction) -> OcrResult | None:
...
@overload
def find(self, img: 'MatLike', pattern: str | re.Pattern | StringMatchFunction) -> OcrResult | None:
...
def find(self, *args, **kwargs) -> OcrResult | None:
"""检查指定图像是否包含指定文本。"""
if len(args) == 1 and len(kwargs) == 0:
return self.__engine.find(self.context.device.screenshot(), args[0])
elif len(args) == 2 and len(kwargs) == 0:
return self.__engine.find(args[0], args[1])
else:
raise ValueError("Invalid arguments")
def expect(
self,
pattern: str | re.Pattern | StringMatchFunction
) -> OcrResult:
"""
检查当前设备画面是否包含指定文本
`find()` 的区别在于`expect()` 未找到时会抛出异常
"""
return self.__engine.expect(self.context.device.screenshot(), pattern)
def expect_wait(self, pattern: str | re.Pattern | StringMatchFunction, timeout: float = 10) -> OcrResult:
"""
等待指定文本出现
"""
start_time = time.time()
while True:
result = self.find(pattern)
if result is not None:
return result
if time.time() - start_time > timeout:
raise TimeoutError(f"Timeout waiting for {pattern}")
time.sleep(0.1)
def wait_for(self, pattern: str | re.Pattern | StringMatchFunction, timeout: float = 10) -> OcrResult | None:
"""
等待指定文本出现
"""
start_time = time.time()
while True:
result = self.find(pattern)
if result is not None:
return result
if time.time() - start_time > timeout:
return None
time.sleep(0.1)
class ContextImage:
def __init__(self, context: 'Context', crop_rect: Rect | None = None):
self.context = context
self.crop_rect = crop_rect
def raw(self):
return raw_image
def wait_for(self, template: str, mask: str | None = None, threshold: float = 0.9, timeout: float = 10) -> bool:
"""
等待指定图像出现
"""
start_time = time.time()
while True:
if self.find(template, mask, threshold):
return True
if time.time() - start_time > timeout:
return False
time.sleep(0.1)
def wait_for_any(self, templates: list[str], masks: list[str | None] | None = None, threshold: float = 0.9, timeout: float = 10):
"""
等待指定图像中的任意一个出现
"""
if masks is None:
_masks = [None] * len(templates)
else:
_masks = masks
start_time = time.time()
while True:
for template, mask in zip(templates, _masks):
if self.find(template, mask, threshold):
return True
if time.time() - start_time > timeout:
return False
time.sleep(0.1)
def expect_wait(
self,
template: str,
mask: str | None = None,
threshold: float = 0.9,
timeout: float = 10
) -> TemplateMatchResult:
"""
等待指定图像出现
"""
start_time = time.time()
while True:
ret = self.find(template, mask, threshold)
if ret is not None:
return ret
if time.time() - start_time > timeout:
raise TimeoutError(f"Timeout waiting for {template}")
time.sleep(0.1)
def expect_wait_any(
self,
templates: list[str],
masks: list[str | None] | None = None,
threshold: float = 0.9,
timeout: float = 10
) -> TemplateMatchResult:
"""
等待指定图像中的任意一个出现
"""
if masks is None:
_masks = [None] * len(templates)
else:
_masks = masks
start_time = time.time()
while True:
for template, mask in zip(templates, _masks):
ret = self.find(template, mask, threshold)
if ret is not None:
return ret
if time.time() - start_time > timeout:
raise TimeoutError(f"Timeout waiting for any of {templates}")
time.sleep(0.1)
def expect(self, template: str, mask: str | None = None, threshold: float = 0.9) -> TemplateMatchResult:
"""
寻找指定图像
`find()` 的区别在于`expect()` 未找到时会抛出异常
"""
return expect(self.context.device.screenshot(), template, mask, threshold=threshold)
def find(self, template: str, mask: str | None = None, threshold: float = 0.9):
"""
寻找指定图像
"""
return find(self.context.device.screenshot(), template, mask, threshold=threshold)
def find_crop(
self,
template: str,
mask: str | None = None,
threshold: float = 0.999,
) -> list[CropResult]:
"""
在当前设备画面中查找指定模板并裁剪出结果
"""
return find_crop(
self.context.device.screenshot(),
template,
mask,
threshold=threshold,
)
class ContextGlobalVars:
def __init__(self):
self.auto_collect: bool = False
"""遇到未知P饮料/卡片时,是否自动截图并收集"""
self.debug: bool = True
class ContextDebug:
def __init__(self, context: 'Context'):
self.__context = context
self.save_images: bool = False
self.save_images_dir: str = "debug_images"
def show(self, img: 'MatLike', title: str = "Debug"):
if not self.__context.vars.debug:
return
if self.save_images:
if not os.path.exists(self.save_images_dir):
os.makedirs(self.save_images_dir)
now = datetime.now()
time_str = now.strftime("%Y-%m-%d %H-%M-%S") + f".{now.microsecond // 1000:03d}"
cv2.imwrite(f"{self.save_images_dir}/{title}_{time_str}.png", img)
cv2.imshow(title, img)
cv2.waitKey(1)
@cache
def _forward_from(getter: Callable[[], T]) -> T:
class Forwarded:
def __getattr__(self, name: str) -> Any:
return getattr(getter(), name)
def __repr__(self) -> str:
return f"Forwarded({object})"
return cast(T, Forwarded())
class Context:
def __init__(self):
# HACK: 暂时写死
from adbutils import adb
adb.connect('127.0.0.1:16384')
self.__device = AdbDevice(adb.device_list()[0])
self.__ocr = ContextOcr(self)
self.__image = ContextImage(self)
self.__vars = ContextGlobalVars()
self.__debug = ContextDebug(self)
self.actions = []
def inject_device(self, device: DeviceProtocol):
self.__device = device
@property
def device(self) -> DeviceProtocol:
return cast(DeviceProtocol, _forward_from(lambda: self.__device))
@property
def ocr(self) -> 'ContextOcr':
return cast(ContextOcr, _forward_from(lambda: self.__ocr))
@property
def image(self) -> 'ContextImage':
return cast(ContextImage, _forward_from(lambda: self.__image))
@property
def vars(self) -> 'ContextGlobalVars':
return cast(ContextGlobalVars, _forward_from(lambda: self.__vars))
@property
def debug(self) -> 'ContextDebug':
return cast(ContextDebug, _forward_from(lambda: self.__debug))
# 暴露 Context 的属性到模块级别
_c = Context()
device: DeviceProtocol = _c.device
"""当前正在执行任务的设备。"""
ocr: ContextOcr = _c.ocr
"""OCR 引擎。"""
image: ContextImage = _c.image
"""图像识别。"""
vars: ContextGlobalVars = _c.vars
"""全局变量。"""
debug: ContextDebug = _c.debug
"""调试工具。"""
# def __getattr__(name: str) -> Any:
# return getattr(_c, name)

184
kotonebot/backend/image.py Normal file
View File

@ -0,0 +1,184 @@
from typing import NamedTuple, Protocol, TypeVar
from logging import getLogger
import cv2
import numpy as np
from cv2.typing import MatLike, Rect, Point, Size
logger = getLogger(__name__)
class TemplateNotFoundError(Exception):
"""模板未找到异常。"""
def __init__(self, image: MatLike, template: MatLike | str):
self.image = image
self.template = template
super().__init__(f"Template not found: {template}")
class ResultProtocol(Protocol):
score: float
position: Point
class TemplateMatchResult(NamedTuple):
score: float
position: Point
"""结果位置。左上角坐标。"""
size: Size
"""输入模板的大小。宽高。"""
@property
def rect(self) -> Rect:
"""结果区域。左上角坐标和宽高。"""
return (self.position[0], self.position[1], self.size[0], self.size[1])
@property
def right_bottom(self) -> Point:
"""结果右下角坐标。"""
return (self.position[0] + self.size[0], self.position[1] + self.size[1])
class CropResult(NamedTuple):
score: float
position: Point
size: Size
image: MatLike
@property
def rect(self) -> Rect:
return (self.position[0], self.position[1], self.size[0], self.size[1])
def _unify_image(image: MatLike | str) -> MatLike:
if isinstance(image, str):
image = cv2.imread(image)
return image
T = TypeVar('T')
# TODO: 这个方法太慢了,需要优化
def _remove_duplicate_matches(
matches: list[T],
offset: int = 10
) -> list[T]:
result = []
# TODO: 解决这个函数的 typing 问题
for match in matches:
if any(abs(match.position[0] - r.position[0]) < offset for r in result): # type: ignore
continue
result.append(match)
return result
def template_match(
template: MatLike | str,
image: MatLike | str,
mask: MatLike | str | None = None,
transparent: bool = False,
threshold: float = 0.8,
max_results: int = 5,
remove_duplicate: bool = True,
) -> list[TemplateMatchResult]:
"""
寻找模板在图像中的位置
.. note::
`mask` `transparent` 参数不能同时使用
<img src="vscode-file://vscode-app/E:/GithubRepos/KotonesAutoAssistant/original.png" width="100">
:param template: 模板图像可以是图像路径或 cv2.Mat
:param image: 图像可以是图像路径或 cv2.Mat
:param mask: 掩码图像可以是图像路径或 cv2.Mat
:param transparent: 若为 True则认为输入模板是透明的并自动将透明模板转换为 Mask 图像
:param threshold: 阈值默认为 0.8
:param max_results: 最大结果数默认为 1
:param remove_duplicate: 是否移除重复结果默认为 True
"""
logger.debug(f'match template: {template} threshold: {threshold} max_results: {max_results}')
# 统一参数
template = _unify_image(template)
image = _unify_image(image)
if mask is not None:
mask = _unify_image(mask)
mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)[1]
# 匹配模板
if mask is not None:
# https://stackoverflow.com/questions/35642497/python-opencv-cv2-matchtemplate-with-transparency
# 使用 Mask 时,必须使用 TM_CCORR_NORMED 方法
result = cv2.matchTemplate(image, template, cv2.TM_CCORR_NORMED, mask=mask)
else:
result = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
# 获取所有大于阈值的匹配结果
locations = list(zip(*np.where(result >= threshold)))
# 转换为 TemplateMatchResult 列表
matches = []
for y, x in locations:
h, w = template.shape[:2]
score = float(result[y, x])
matches.append(TemplateMatchResult(score=score, position=(int(x), int(y)), size=(int(w), int(h))))
# 按分数排序并限制结果数量
matches.sort(key=lambda x: x.score, reverse=True)
if max_results > 0:
matches = matches[:max_results]
return matches
def find_crop(
image: MatLike | str,
template: MatLike | str,
mask: MatLike | str | None = None,
transparent: bool = False,
threshold: float = 0.8,
) -> list[CropResult]:
"""
使用 Mask 寻找指定图像并裁剪出结果
"""
matches = template_match(template, image, mask, transparent, threshold, max_results=-1)
matches = _remove_duplicate_matches(matches)
return [CropResult(
match.score,
match.position,
match.size,
image[match.rect[1]:match.rect[1]+match.rect[3], match.rect[0]:match.rect[0]+match.rect[2]] # type: ignore
) for match in matches]
def find(
image: MatLike,
template: MatLike | str,
mask: MatLike | str | None = None,
transparent: bool = False,
threshold: float = 0.8,
) -> TemplateMatchResult | None:
"""寻找一个模板图像"""
matches = template_match(template, image, mask, transparent, threshold, max_results=-1)
return matches[0] if len(matches) > 0 else None
def count(
image: MatLike,
template: MatLike | str,
mask: MatLike | str | None = None,
transparent: bool = False,
threshold: float = 0.9,
remove_duplicate: bool = True,
) -> int:
results = template_match(template, image, mask, transparent, threshold, max_results=-1)
if remove_duplicate:
results = _remove_duplicate_matches(results)
# 画出结果
# for result in results:
# cv2.rectangle(image, result.rect, (0, 0, 255), 2)
# cv2.imshow('count', image)
# cv2.waitKey(0)
return len(results)
def expect(
image: MatLike,
template: MatLike | str,
mask: MatLike | str | None = None,
transparent: bool = False,
threshold: float = 0.9,
) -> TemplateMatchResult:
ret = find(image, template, mask, transparent, threshold)
if ret is None:
raise TemplateNotFoundError(image, template)
return ret

111
kotonebot/backend/ocr.py Normal file
View File

@ -0,0 +1,111 @@
import re
import time
import unicodedata
from os import PathLike
from typing import TYPE_CHECKING, Callable, NamedTuple, overload
from kotonebot.backend.util import Rect
if TYPE_CHECKING:
from cv2.typing import MatLike
from rapidocr_onnxruntime import RapidOCR
_engine_jp = RapidOCR(
rec_model_path=r'res\models\japan_PP-OCRv3_rec_infer.onnx',
use_det=True,
use_cls=False,
use_rec=True,
)
_engine_en = RapidOCR(
rec_model_path=r'res\models\en_PP-OCRv3_rec_infer.onnx',
use_det=True,
use_cls=False,
use_rec=True,
)
StringMatchFunction = Callable[[str], bool]
class OcrResult(NamedTuple):
text: str
rect: Rect
confidence: float
class TextNotFoundError(Exception):
def __init__(self, pattern: str | re.Pattern | StringMatchFunction, image: 'MatLike'):
self.pattern = pattern
self.image = image
if isinstance(pattern, (str, re.Pattern)):
super().__init__(f"Expected text not found: {pattern}")
else:
super().__init__(f"Expected text not found: {pattern.__name__}")
def _is_match(text: str, pattern: re.Pattern | str | StringMatchFunction) -> bool:
if isinstance(pattern, re.Pattern):
return pattern.match(text) is not None
elif callable(pattern):
return pattern(text)
else:
return text == pattern
class Ocr:
def __init__(self, engine: RapidOCR):
self.__engine = engine
# TODO: 考虑缓存 OCR 结果,避免重复调用。
def ocr(self, img: 'MatLike') -> list[OcrResult]:
"""
OCR 一个 cv2 的图像注意识别结果中的**全角字符会被转换为半角字符**
:return: 所有识别结果
"""
img_content = img
result, elapse = self.__engine(img_content)
if result is None:
return []
return [OcrResult(
text=unicodedata.normalize('NFKC', r[1]).replace('ą', 'a'), # HACK: 识别结果中包含奇怪的符号,暂时替换掉
rect=(
int(r[0][0][0]), # 左上x
int(r[0][0][1]), # 左上y
int(r[0][2][0] - r[0][0][0]), # 宽度 = 右下x - 左上x # type: ignore
int(r[0][2][1] - r[0][0][1]), # 高度 = 右下y - 左上y # type: ignore
),
confidence=r[2] # type: ignore
) for r in result] # type: ignore
def find(self, img: 'MatLike', text: str | re.Pattern | StringMatchFunction) -> OcrResult | None:
"""
寻找指定文本
:return: 找到的文本如果未找到则返回 None
"""
for result in self.ocr(img):
if _is_match(result.text, text):
return result
return None
def expect(self, img: 'MatLike', text: str | re.Pattern | StringMatchFunction) -> OcrResult:
"""
寻找指定文本如果未找到则抛出异常
"""
ret = self.find(img, text)
if ret is None:
raise TextNotFoundError(text, img)
return ret
jp = Ocr(_engine_jp)
"""日语 OCR 引擎。"""
en = Ocr(_engine_en)
"""英语 OCR 引擎。"""
if __name__ == '__main__':
from pprint import pprint as print
import cv2
img_path = 'test_images/acquire_pdorinku.png'
img = cv2.imread(img_path)
result1 = jp.ocr(img)
print(result1)

75
kotonebot/backend/util.py Normal file
View File

@ -0,0 +1,75 @@
from functools import lru_cache
import re
import typing
from typing import NamedTuple, Callable
from cv2.typing import MatLike
from thefuzz import fuzz as _fuzz
class TaskInfo(NamedTuple):
name: str
description: str
entry: Callable[[], None]
Rect = typing.Sequence[int]
"""左上X, 左上Y, 宽度, 高度"""
def is_rect(rect: typing.Any) -> bool:
return isinstance(rect, typing.Sequence) and len(rect) == 4 and all(isinstance(i, int) for i in rect)
@lru_cache(maxsize=1000)
def fuzz(text: str) -> Callable[[str], bool]:
"""返回 fuzzy 算法的字符串匹配函数。"""
f = lambda s: _fuzz.ratio(s, text) > 90
f.__repr__ = lambda: f"fuzzy({text})"
f.__name__ = f"fuzzy({text})"
return f
@lru_cache(maxsize=1000)
def regex(regex: str) -> Callable[[str], bool]:
"""返回正则表达式字符串匹配函数。"""
f = lambda s: re.match(regex, s) is not None
f.__repr__ = lambda: f"regex({regex})"
f.__name__ = f"regex({regex})"
return f
@lru_cache(maxsize=1000)
def contains(text: str) -> Callable[[str], bool]:
"""返回包含指定文本的函数。"""
f = lambda s: text in s
f.__repr__ = lambda: f"contains({text})"
f.__name__ = f"contains({text})"
return f
def crop(img: MatLike, x1: float, y1: float, x2: float, y2: float) -> MatLike:
"""按比例裁剪图像"""
h, w = img.shape[:2]
x1_px = int(w * x1)
y1_px = int(h * y1)
x2_px = int(w * x2)
y2_px = int(h * y2)
return img[y1_px:y2_px, x1_px:x2_px]
def crop_y(img: MatLike, y1: float, y2: float) -> MatLike:
"""按比例垂直裁剪图像"""
h, _ = img.shape[:2]
y1_px = int(h * y1)
y2_px = int(h * y2)
return img[y1_px:y2_px, :]
def crop_x(img: MatLike, x1: float, x2: float) -> MatLike:
"""按比例水平裁剪图像"""
_, w = img.shape[:2]
x1_px = int(w * x1)
x2_px = int(w * x2)
return img[:, x1_px:x2_px]
def cropper(x1: float, y1: float, x2: float, y2: float) -> Callable[[MatLike], MatLike]:
return lambda img: crop(img, x1, y1, x2, y2)
def cropper_y(y1: float, y2: float) -> Callable[[MatLike], MatLike]:
return lambda img: crop_y(img, y1, y2)
def cropper_x(x1: float, x2: float) -> Callable[[MatLike], MatLike]:
return lambda img: crop_x(img, x1, x2)

View File

@ -0,0 +1 @@
此文件夹下为客户端操作代码,包括对模拟器的启停,设备的控制等

View File

@ -0,0 +1 @@
from .protocol import DeviceProtocol

View File

View File

@ -0,0 +1,93 @@
from typing import Callable, cast
import cv2
import numpy as np
from cv2.typing import MatLike
from kotonebot.backend.util import Rect, is_rect
from ..protocol import DeviceProtocol, ClickableObjectProtocol
from adbutils import AdbClient, adb
from adbutils._device import AdbDevice as Device
class AdbDevice(DeviceProtocol):
def __init__(self, device: Device) -> None:
self.device = device
self.screenshot_hook: Callable[[MatLike], MatLike] | None = None
def launch_app(self, package_name: str) -> None:
self.device.shell(f"monkey -p {package_name} 1")
def click(self, arg1, arg2=None) -> None:
if is_rect(arg1):
self.__click_rect(arg1)
elif isinstance(arg1, int) and isinstance(arg2, int):
self.__click_point(arg1, arg2)
elif isinstance(arg1, ClickableObjectProtocol):
self.__click_clickable(arg1)
else:
raise ValueError(f"Invalid arguments: {arg1}, {arg2}")
def __click_rect(self, rect: Rect) -> None:
# 从矩形中心的 60% 内部随机选择一点
x = rect[0] + rect[2] // 2 + np.random.randint(-int(rect[2] * 0.3), int(rect[2] * 0.3))
y = rect[1] + rect[3] // 2 + np.random.randint(-int(rect[3] * 0.3), int(rect[3] * 0.3))
x = int(x)
y = int(y)
self.click(x, y)
def __click_point(self, x: int, y: int) -> None:
self.device.shell(f"input tap {x} {y}")
def __click_clickable(self, clickable: ClickableObjectProtocol) -> None:
self.click(clickable.rect)
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: int = 100) -> None:
self.device.shell(f"input swipe {x1} {y1} {x2} {y2} {duration}")
def screenshot(self) -> MatLike:
img = cv2.cvtColor(np.array(self.device.screenshot()), cv2.COLOR_RGB2BGR)
if self.screenshot_hook is not None:
img = self.screenshot_hook(img)
return img
@property
def screen_size(self) -> tuple[int, int]:
ret = cast(str, self.device.shell("wm size")).strip('Physical size: ')
spiltted = tuple(map(int, ret.split("x")))
if len(spiltted) != 2:
raise ValueError(f"Invalid screen size: {ret}")
return spiltted
@staticmethod
def list_devices() -> list[str]:
raise NotImplementedError
if __name__ == "__main__":
print("server version:", adb.server_version())
adb.connect("127.0.0.1:16384", )
print("devices:", adb.device_list())
d = adb.device_list()[0]
dd = AdbDevice(d)
# dd.launch_app("com.android.settings")
# 实时展示画面
import cv2
import numpy as np
while True:
img = dd.screenshot()
# img = cv2.imdecode(np.frombuffer(img, np.uint8), cv2.IMREAD_COLOR)
# img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
cv2.imshow("screen", img)
# 50% 缩放
img = cv2.resize(img, (img.shape[1] // 4, img.shape[0] // 4))
# 获取当前时间
import time
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 在图像上绘制时间
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(img, current_time, (10, 30), font, 1, (0, 255, 0), 2, cv2.LINE_AA)
cv2.waitKey(1)

View File

@ -0,0 +1,378 @@
# type: ignore
# source: https://github.com/hansalemaos/adbnativeblitz
# license: MIT
# requires: av
import base64
import ctypes
import os
import platform
import signal
import subprocess
import sys
import threading
from collections import deque
from functools import cache
import av
from time import sleep as sleep_
from math import floor
def sleep(secs):
try:
if secs == 0:
return
maxrange = 50 * secs
if isinstance(maxrange, float):
sleeplittle = floor(maxrange)
sleep_((maxrange - sleeplittle) / 50)
maxrange = int(sleeplittle)
if maxrange > 0:
for _ in range(maxrange):
sleep_(0.016)
except KeyboardInterrupt:
return
iswindows = "win" in platform.platform().lower()
if iswindows:
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
creationflags = subprocess.CREATE_NO_WINDOW
invisibledict = {
"startupinfo": startupinfo,
"creationflags": creationflags,
"start_new_session": True,
}
from ctypes import wintypes
windll = ctypes.LibraryLoader(ctypes.WinDLL)
kernel32 = windll.kernel32
_GetShortPathNameW = kernel32.GetShortPathNameW
_GetShortPathNameW.argtypes = [wintypes.LPCWSTR, wintypes.LPWSTR, wintypes.DWORD]
_GetShortPathNameW.restype = wintypes.DWORD
else:
invisibledict = {}
@cache
def get_short_path_name(long_name):
try:
if not iswindows:
return long_name
output_buf_size = 4096
output_buf = ctypes.create_unicode_buffer(output_buf_size)
_ = _GetShortPathNameW(long_name, output_buf, output_buf_size)
return output_buf.value
except Exception as e:
sys.stderr.write(f"{e}\n")
return long_name
def killthread(threadobject):
# based on https://pypi.org/project/kthread/
if not threadobject.is_alive():
return True
tid = -1
for tid1, tobj in threading._active.items():
if tobj is threadobject:
tid = tid1
break
if tid == -1:
sys.stderr.write(f"{threadobject} not found")
return False
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(tid), ctypes.py_object(SystemExit)
)
if res == 0:
return False
elif res != 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
return False
return True
def send_ctrl_commands(pid, command=0):
if iswindows:
commandstring = r"""import ctypes, sys; CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT = 0, 1, 2, 3, 4; kernel32 = ctypes.WinDLL("kernel32", use_last_error=True); (lambda pid, cmdtosend=CTRL_C_EVENT: [kernel32.FreeConsole(), kernel32.AttachConsole(pid), kernel32.SetConsoleCtrlHandler(None, 1), kernel32.GenerateConsoleCtrlEvent(cmdtosend, 0), sys.exit(0) if isinstance(pid, int) else None])(int(sys.argv[1]), int(sys.argv[2]) if len(sys.argv) > 2 else None) if __name__ == '__main__' else None"""
subprocess.Popen(
[sys.executable, "-c", commandstring, str(pid), str(command)],
**invisibledict,
)
else:
os.kill(pid, signal.SIGINT)
class StopDescriptor:
def __get__(self, instance, owner):
return instance.__dict__[self.name]
def __set__(self, instance, value):
if not value:
instance.__dict__[self.name] = False
else:
instance.__dict__[self.name] = True
instance.stop_capture()
def __delete__(self, instance):
sys.stderr.write("Cannot be deleted!")
def __set_name__(self, owner, name):
self.name = name
class AdbFastScreenshots:
stop_recording = StopDescriptor()
def __init__(
self,
adb_path,
device_serial,
time_interval=179,
width=1600,
height=900,
bitrate="20M",
use_busybox=False,
connect_to_device=True,
screenshotbuffer=10,
go_idle=0,
):
r"""Capture Android device screen using ADB's screenrecord with high frame rate.
This class allows capturing the screen of an Android device using ADB's screenrecord
command with an improved frame rate. It continuously captures frames from the device
and provides them as NumPy arrays to the caller.
Args:
adb_path (str): The path to the ADB executable.
device_serial (str): The serial number of the target Android device.
time_interval (int): The maximum duration, in seconds, for each screen recording session (up to a maximum of 180 seconds). After reaching this time limit, a new recording session automatically starts without causing interruptions to the user experience.
width (int): The width of the captured screen.
height (int): The height of the captured screen.
bitrate (str): The bitrate for screen recording (e.g., "20M" for 20Mbps).
use_busybox (bool): Whether to use BusyBox for base64 encoding.
connect_to_device (bool): Whether to connect to the device using ADB.
screenshotbuffer (int): The size of the frame buffer to store the last captured frames.
go_idle (float): The idle time (in seconds) when no new frames are available. # higher value -> less fps, but also less CPU usage.
Attributes:
stop_recording (bool): Control attribute to stop the screen capture.
Methods:
stop_capture(): Stops the screen capture.
Usage:
import cv2
from adbnativeblitz import AdbFastScreenshots
with AdbFastScreenshots(
adb_path=r"C:\Android\android-sdk\platform-tools\adb.exe",
device_serial="127.0.0.1:5555",
time_interval=179,
width=1600,
height=900,
bitrate="20M",
use_busybox=False,
connect_to_device=True,
screenshotbuffer=10,
go_idle=0,
) as adbscreen:
for image in adbscreen:
cv2.imshow("CV2 WINDOW", image)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cv2.destroyAllWindows()
Note:
- The `AdbFastScreenshots` class should be used in a context manager (`with` statement).
- The `stop_capture()` method can be called to stop the screen capture.
- The frames are continuously captured and provided in the form of NumPy arrays.
- The class aims to achieve a higher frame rate by avoiding slow subprocess creation
for each screen capture session.
"""
self.stop_recording = False
self.size = f"{width}x{height}"
self.width = width
self.height = height
self.timelimit = time_interval
self.bitrate = bitrate
self.use_busybox = use_busybox
self.adb_path = get_short_path_name(adb_path)
self.device_serial = device_serial
if connect_to_device:
subprocess.run(
[self.adb_path, "connect", self.device_serial], **invisibledict
)
self.threadlock = threading.Lock()
self.codec = av.codec.CodecContext.create("h264", "r")
self.lastframes = deque([], screenshotbuffer)
self.command_to_execute = (
f"""#!/bin/bash
startscreenrecord() {{
screenrecord --output-format=h264 --time-limit "$1" --size "$2" --bit-rate "$3" -
}}
time_interval={self.timelimit}
size="{self.size}"
bitrate="{self.bitrate}"
#screenrecord --output-format=h264 --time-limit 1 --size "$size" --bit-rate "$bitrate" -
while true; do
startscreenrecord $time_interval "$size" "$bitrate"
done"""
+ "\n"
)
self.base64cmd = self.format_adb_command(
self.command_to_execute,
su=False,
exitcommand="",
errors="strict",
)
self.p = None
self.threadstdout = None
self.framecounter = 0
self.go_idle = go_idle
def format_adb_command(
self,
cmd,
su=False,
exitcommand="DONE",
errors="strict",
):
if su:
cmd = f"su -- {cmd}"
if exitcommand:
cmd = cmd.rstrip() + f"\necho {exitcommand}\n"
nolimitcommand = []
base64_command = base64.standard_b64encode(cmd.encode("utf-8", errors)).decode(
"utf-8", errors
)
nolimitcommand.extend(["echo", base64_command, "|"])
if self.use_busybox:
nolimitcommand.extend(["busybox"])
nolimitcommand.extend(["base64", "-d", "|", "sh"])
return " ".join(nolimitcommand) + "\n"
def _start_capturing(self):
def _execute_stdout_read():
try:
for q in iter(self.p.stdout.readline, b""):
if iswindows:
q = q.replace(b"\r\n", b"\n")
if q:
alldata.append(q)
if alldata:
joineddata = b"".join(alldata)
try:
packets = self.codec.parse(joineddata)
if packets:
for pack in packets:
frames = self.codec.decode(pack)
for frame in frames:
nparray = (
frame.to_rgb()
.reformat(
width=self.width,
height=self.height,
format="bgr24",
)
.to_ndarray()
)
try:
self.threadlock.acquire()
self.lastframes.append(nparray)
self.framecounter += 1
finally:
try:
self.threadlock.release()
except Exception as e:
sys.stderr.write(f"{e}\n")
alldata.clear()
except Exception as e:
sys.stderr.write(f"{e}\n")
except Exception as e:
sys.stderr.write(f"{e}\n")
self.p = subprocess.Popen(
[self.adb_path, "-s", self.device_serial, "shell", self.base64cmd],
stderr=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stdin=subprocess.DEVNULL,
bufsize=0,
**invisibledict,
)
alldata = []
self.threadstdout = threading.Thread(target=_execute_stdout_read)
self.threadstdout.daemon = True
self.threadstdout.start()
def _stop_capture(self):
try:
if iswindows:
subprocess.Popen(f"taskkill /F /PID {self.p.pid} /T", **invisibledict)
except:
pass
try:
self.p.stdout.close()
except:
pass
try:
killthread(self.threadstdout)
except:
pass
def stop_capture(self):
send_ctrl_commands(self.p.pid, command=0)
try:
sleep(1)
except KeyboardInterrupt:
pass
self._stop_capture()
def __iter__(self):
oldframecounter = 0
self._start_capturing()
sleep(0.05)
while not self.stop_recording:
if not self.lastframes:
sleep(0.005)
continue
yield self.lastframes[-1].copy()
if oldframecounter == self.framecounter:
if self.go_idle:
sleep(self.go_idle)
oldframecounter = self.framecounter
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.stop_recording = True
if __name__ == "__main__":
import cv2
with AdbFastScreenshots(
adb_path=r"D:\SDK\Android\platform-tools\adb.exe",
device_serial="127.0.0.1:16384",
time_interval=179,
width=720,
height=1280,
bitrate="20M",
use_busybox=False,
connect_to_device=True,
screenshotbuffer=10,
go_idle=0,
) as adbscreen:
for image in adbscreen:
cv2.imshow("CV2 WINDOW", image)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cv2.destroyAllWindows()

View File

@ -0,0 +1,2 @@
`client.host` 模块为设备提供平台的相关代码,通常指的是本地模拟器。
此模块主要负责创建、列出、连接、关闭、删除设备等操作。

View File

View File

View File

View File

@ -0,0 +1,143 @@
from time import sleep
from typing import Callable, Protocol, TYPE_CHECKING, overload, runtime_checkable
from cv2.typing import MatLike
from kotonebot.backend.util import Rect, is_rect
@runtime_checkable
class ClickableObjectProtocol(Protocol):
"""
可点击对象的协议
"""
@property
def rect(self) -> Rect:
...
class DeviceScreenshotProtocol(Protocol):
def screenshot(self) -> MatLike:
"""
截图
"""
...
class HookContextManager:
def __init__(self, device: 'DeviceProtocol', func: Callable[[MatLike], MatLike]):
self.device = device
self.func = func
self.old_func = device.screenshot_hook
def __enter__(self):
self.device.screenshot_hook = self.func
return self
def __exit__(self, exc_type, exc_value, traceback):
self.device.screenshot_hook = self.old_func
class DeviceProtocol(Protocol):
"""
针对单个设备可执行的操作的协议/接口
"""
screenshot_hook: Callable[[MatLike], MatLike] | None
@staticmethod
def list_devices() -> list[str]:
...
def launch_app(self, package_name: str) -> None:
"""
根据包名启动 app
"""
...
@overload
def click(self, x: int, y: int) -> None:
"""
点击屏幕上的某个点
"""
...
@overload
def click(self, rect: Rect) -> None:
"""
从屏幕上的某个矩形区域随机选择一个点并点击
"""
...
@overload
def click(self, clickable: ClickableObjectProtocol) -> None:
"""
点击屏幕上的某个可点击对象
"""
...
def click_center(self) -> None:
"""
点击屏幕中心
"""
x, y = self.screen_size[0] // 2, self.screen_size[1] // 2
self.click(x, y)
@overload
def double_click(self, x: int, y: int, interval: float = 0.5) -> None:
"""
双击屏幕上的某个点
"""
...
@overload
def double_click(self, rect: Rect, interval: float = 0.5) -> None:
"""
双击屏幕上的某个矩形区域
"""
...
@overload
def double_click(self, clickable: ClickableObjectProtocol, interval: float = 0.5) -> None:
"""
双击屏幕上的某个可点击对象
"""
...
def double_click(self, *args, **kwargs) -> None:
arg0 = args[0]
if is_rect(arg0) or isinstance(arg0, ClickableObjectProtocol):
rect = arg0
interval = kwargs.get('interval', 0.5)
self.click(rect)
sleep(interval)
self.click(rect)
else:
x = args[0]
y = args[1]
interval = kwargs.get('interval', 0.5)
self.click(x, y)
sleep(interval)
self.click(x, y)
def swipe(self, x1: int, y1: int, x2: int, y2: int) -> None:
"""
滑动屏幕
"""
...
def screenshot(self) -> MatLike:
"""
截图
"""
...
def hook(self, func: Callable[[MatLike], MatLike]) -> HookContextManager:
"""
注册 Hook在截图前将会调用此函数对截图进行处理
"""
return HookContextManager(self, func)
@property
def screen_size(self) -> tuple[int, int]:
"""
屏幕尺寸
"""
...

View File

@ -0,0 +1,637 @@
import random
import re
import time
from typing_extensions import deprecated
import numpy as np
import cv2
import unicodedata
import logging
from time import sleep
from kotonebot import ocr, device, fuzz, contains, image, debug, regex
from kotonebot.backend.util import crop_y, cropper_y
from kotonebot.tasks import R
from kotonebot.tasks.actions import loading
from kotonebot.tasks.actions.pdorinku import acquire_pdorinku
logger = logging.getLogger(__name__)
def enter_recommended_action(final_week: bool = False) -> bool:
"""
在行动选择页面执行推荐行动
:param final_week: 是否是考试前复习周
:return: 是否成功执行推荐行动
"""
# 获取课程
logger.debug("Waiting for recommended lesson...")
with device.hook(cropper_y(0.00, 0.30)):
ret = ocr.wait_for(regex('ボーカル|ダンス|ビジュアル|休|体力'))
logger.debug("ocr.wait_for: %s", ret)
if ret is None:
return False
if not final_week:
if "ボーカル" in ret.text:
lesson_text = "Vo.レッスン"
elif "ダンス" in ret.text:
lesson_text = "Da.レッスン"
elif "ビジュアル" in ret.text:
lesson_text = "Vi.レッスン"
elif "" in ret.text or "体力" in ret.text:
rest()
return True
else:
return False
logger.info("Rec. lesson: %s", lesson_text)
# 点击课程
logger.debug("Try clicking lesson...")
lesson_ret = ocr.expect(contains(lesson_text))
device.double_click(lesson_ret.rect)
return True
else:
if "ボーカル" in ret.text:
template = R.InPurodyuusu.ButtonFinalPracticeVocal
elif "ダンス" in ret.text:
template = R.InPurodyuusu.ButtonFinalPracticeDance
elif "ビジュアル" in ret.text:
template = R.InPurodyuusu.ButtonFinalPracticeVisual
else:
return False
logger.debug("Try clicking lesson...")
device.double_click(image.expect_wait(template))
return True
def before_start_action():
"""检测支援卡剧情、领取资源等"""
raise NotImplementedError()
def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
"""点击推荐卡片
:param timeout: 超时时间()
:param card_count: 卡片数量(2-4)
:return: 执行结果-1=失败0~3=卡片位置10=跳过此回合
"""
import cv2
import numpy as np
from cv2.typing import MatLike
# 定义检测参数
TARGET_ASPECT_RATIO_RANGE = (0.73, 0.80)
TARGET_COLOR = (240, 240, 240)
YELLOW_LOWER = np.array([20, 100, 100])
YELLOW_UPPER = np.array([30, 255, 255])
GLOW_EXTENSION = 10 # 向外扩展的像素数
GLOW_THRESHOLD = 1200 # 荧光值阈值
# 固定的卡片坐标 (for 720x1280)
CARD_POSITIONS_1 = [
(264, 883, 192, 252)
]
CARD_POSITIONS_2 = [
(156, 883, 192, 252),
(372, 883, 192, 252),
# delta_x = 216, delta_x-width = 24
]
CARD_POSITIONS_3 = [
(47, 883, 192, 252), # 左卡片 (x, y, w, h)
(264, 883, 192, 252), # 中卡片
(481, 883, 192, 252) # 右卡片
# delta_x = 217, delta_x-width = 25
]
CARD_POSITIONS_4 = [
(17, 883, 192, 252),
(182, 883, 192, 252),
(346, 883, 192, 252),
(511, 883, 192, 252),
# delta_x = 165, delta_x-width = -27
]
SKIP_POSITION = (621, 739, 85, 85)
@deprecated('此方法待改进')
def calc_pos(card_count: int):
# 根据卡片数量计算实际位置
CARD_PAD = 25
CARD_SCREEN_PAD = 17
card_positions = []
# 计算卡片位置
if card_count == 1:
card_positions = [CARD_POSITIONS_3[1]] # 只使用中间位置
else:
# 计算原始卡片间距
card_spacing = CARD_POSITIONS_3[1][0] - CARD_POSITIONS_3[0][0]
card_width = CARD_POSITIONS_3[0][2]
# 计算屏幕可用宽度
screen_width = 720
available_width = screen_width - (CARD_SCREEN_PAD * 2)
# 计算使用原始间距时的总宽度
original_total_width = (card_count - 1) * card_spacing + card_width
# 判断是否需要重叠布局
if original_total_width > available_width:
spacing = (available_width - card_width * card_count - CARD_SCREEN_PAD * 2) // (card_count)
start_x = CARD_SCREEN_PAD
else:
spacing = card_spacing
start_x = (screen_width - original_total_width) // 2
# 生成所有卡片位置
x = start_x
for i in range(card_count):
y = CARD_POSITIONS_3[0][1]
w = CARD_POSITIONS_3[0][2]
h = CARD_POSITIONS_3[0][3]
card_positions.append((round(x), round(y), round(w), round(h)))
x += spacing + card_width
return card_positions
def calc_pos2(card_count: int):
if card_count == 1:
return CARD_POSITIONS_1
elif card_count == 2:
return CARD_POSITIONS_2
elif card_count == 3:
return CARD_POSITIONS_3
elif card_count == 4:
return CARD_POSITIONS_4
else:
raise ValueError(f"Unsupported card count: {card_count}")
logger.debug("等待截图...")
start_time = time.time()
while time.time() - start_time < timeout:
img = device.screenshot()
# 检测卡片
card_glows = []
for x, y, w, h in calc_pos2(card_count) + [SKIP_POSITION]:
# 获取扩展后的卡片区域坐标
outer_x = max(0, x - GLOW_EXTENSION)
outer_y = max(0, y - GLOW_EXTENSION)
outer_w = w + (GLOW_EXTENSION * 2)
outer_h = h + (GLOW_EXTENSION * 2)
# 获取内外两个区域
outer_region = img[outer_y:y+h+GLOW_EXTENSION, outer_x:x+w+GLOW_EXTENSION]
inner_region = img[y:y+h, x:x+w]
# 创建掩码
outer_hsv = cv2.cvtColor(outer_region, cv2.COLOR_BGR2HSV)
inner_hsv = cv2.cvtColor(inner_region, cv2.COLOR_BGR2HSV)
# 计算外部区域的黄色部分
outer_mask = cv2.inRange(outer_hsv, YELLOW_LOWER, YELLOW_UPPER)
inner_mask = cv2.inRange(inner_hsv, YELLOW_LOWER, YELLOW_UPPER)
# 创建环形区域的掩码(仅计算扩展区域的荧光值)
ring_mask = outer_mask.copy()
ring_mask[GLOW_EXTENSION:GLOW_EXTENSION+h, GLOW_EXTENSION:GLOW_EXTENSION+w] = 0
# 计算环形区域的荧光值
glow_value = cv2.countNonZero(ring_mask)
card_glows.append((x, y, w, h, glow_value))
# 找到荧光值最高的卡片
if not card_glows:
logger.debug("No glowing card found, retrying...")
continue
else:
max_glow_card = max(card_glows, key=lambda x: x[4])
x, y, w, h, glow_value = max_glow_card
if glow_value < GLOW_THRESHOLD:
logger.debug("Glow value is too low, retrying...")
continue
# 点击卡片中心
logger.debug(f"Click glowing card at: ({x + w//2}, {y + h//2})")
device.click(x + w//2, y + h//2)
sleep(random.uniform(0.5, 1.5))
device.click(x + w//2, y + h//2)
return True
return False
@deprecated('此方法待改进')
def skill_card_count1():
"""获取当前持有的技能卡数量"""
img = device.screenshot()
img = crop_y(img, 0.83, 0.90)
# 黑白
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 白色 -> 黑色
# 仅将白色(255)替换为黑色(0),保持其他颜色不变
img[img == 255] = 0
# 二值化
_, img = cv2.threshold(img, 240, 255, cv2.THRESH_BINARY)
debug.show(img)
ret = ocr.raw('en').ocr(img)
# 统计字母 A、M 数量
count = 0
for item in ret:
if 'A' in item.text or 'M' in item.text:
count += 1
logger.info("Current skill card count: %d", count)
return count
def skill_card_count():
"""获取当前持有的技能卡数量"""
img = device.screenshot()
img = crop_y(img, 0.83, 0.90)
count = image.raw().count(img, R.InPurodyuusu.A, threshold=0.85)
count += image.raw().count(img, R.InPurodyuusu.M, threshold=0.85)
logger.info("Current skill card count: %d", count)
return count
def remaing_turns_and_points():
"""获取剩余回合数和积分"""
ret = ocr.ocr()
logger.debug("ocr.ocr: %s", ret)
def index_of(text: str) -> int:
for i, item in enumerate(ret):
# CLEARまで -> CLEARまで
if text == unicodedata.normalize('NFKC', item.text):
return i
return -1
turns_tip_index = index_of("残りターン数")
points_tip_index = index_of("CLEARまで")
turns_rect = ret[turns_tip_index].rect
# 向下扩展100像素
turns_rect_extended = (
turns_rect[0], # x
turns_rect[1], # y
turns_rect[2], # width
turns_rect[3] + 100 # height + 100
)
# 裁剪并再次识别
turns_img = device.screenshot()[
turns_rect_extended[1]:turns_rect_extended[1]+turns_rect_extended[3],
turns_rect_extended[0]:turns_rect_extended[0]+turns_rect_extended[2]
]
turns_ocr = ocr.ocr(turns_img)
logger.debug("turns_ocr: %s", turns_ocr)
def acquire_skill_card():
"""获取技能卡(スキルカード)"""
# TODO: 识别卡片内容,而不是固定选卡
# TODO: 不硬编码坐标
CARD_POSITIONS = [
(157, 820, 128, 128),
(296, 820, 128, 128),
(435, 820, 128, 128),
]
logger.info("Click first skill card")
device.click(CARD_POSITIONS[0])
sleep(0.5)
# 确定
logger.info("Click 受け取る")
device.click(ocr.expect(contains("受け取る")).rect)
# 跳过动画
device.click(image.expect_wait_any([
R.InPurodyuusu.PSkillCardIconBlue,
R.InPurodyuusu.PSkillCardIconColorful
]))
def rest():
"""执行休息"""
logger.info("Rest for this week.")
# 点击休息
device.click(image.expect_wait(R.InPurodyuusu.Rest))
# 确定
device.click(image.expect_wait(R.InPurodyuusu.RestConfirmBtn))
def acquisitions():
"""处理行动结束可能需要处理的事件,直到到行动页面为止"""
logger.info("Action end stuffs...")
# P饮料被动领取
logger.info("Check PDrink acquisition...")
if image.find(R.InPurodyuusu.PDrinkIcon):
logger.info("Click to finish animation")
device.click_center()
sleep(1)
# P物品
# logger.info("Check PItem acquisition...")
# if image.wait_for(R.InPurodyuusu.PItemIcon, timeout=1):
# logger.info("Click to finish animation")
# device.click_center()
# sleep(1)
# 技能卡被动领取
logger.info("Check skill card acquisition...")
if image.wait_for_any([
R.InPurodyuusu.PSkillCardIconBlue,
R.InPurodyuusu.PSkillCardIconColorful
], timeout=1):
logger.info("Acquire skill card")
device.click_center()
# 技能卡主动领取
if ocr.find(contains("受け取るスキルカードを選んでください")):
logger.info("Acquire skill card")
acquire_skill_card()
# P饮料主动领取
if ocr.find(contains("受け取るPドリンクを選れでください")):
# 不领取
device.click(ocr.expect(contains("受け取らない")))
sleep(0.5)
device.click(image.expect(R.InPurodyuusu.ButtonNotAcquire))
sleep(0.5)
device.click(image.expect(R.InPurodyuusu.ButtonConfirm))
# 检测目标达成
if ocr.find(contains("達成")):
logger.debug("達成: clicked")
device.click_center()
sleep(2)
logger.debug("達成: clicked 2")
device.click_center()
# 支援卡
# logger.info("Check support card acquisition...")
# 记忆
# 未跳过剧情
def until_action_scene():
# 检测是否到行动页面
while not image.wait_for_any([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
], timeout=1):
logger.info("Action scene not detected. Retry...")
acquisitions()
sleep(1)
else:
logger.info("Now at action scene.")
return
def until_practice_scene():
while not image.wait_for(R.InPurodyuusu.TextClearUntil, timeout=1):
acquisitions()
sleep(1)
def practice():
"""执行练习"""
logger.info("Practice started")
# 循环打出推荐卡
no_card_count = 0
MAX_NO_CARD_COUNT = 3
while True:
count = skill_card_count()
if count == 0:
logger.info("No skill card found. Wait and retry...")
no_card_count += 1
if no_card_count >= MAX_NO_CARD_COUNT:
break
sleep(3)
continue
if not click_recommended_card(card_count=count):
break
sleep(9) # TODO: 采用更好的方式检测练习结束
# 跳过动画
logger.info("Recommend card not found. Practice finished.")
ocr.expect_wait(contains("上昇"))
device.click_center()
logger.info("Wait practice finish animation...")
# # 领取P饮料
# sleep(7) # TODO: 采用更好的方式检测动画结束
# if image.wait_for(R.InPurodyuusu.PDrinkIcon, timeout=5):
# logger.info("Click to finish animation")
# device.click_center()
# sleep(1)
# # 领取技能卡
# ocr.wait_for(contains("受け取るスキルカードを選んでください"))
# logger.info("Acquire skill card")
# acquire_skill_card()
# # 等待加载动画
# loading.wait_loading_start()
# logger.info("Loading...")
# loading.wait_loading_end()
# logger.info("Loading end")
# # 检测目标达成
# if ocr.wait_for(contains("達成"), timeout=5):
# logger.debug("達成: clicked")
# device.click_center()
# sleep(2)
# logger.debug("達成: clicked 2")
# device.click_center()
def exam():
"""执行考试"""
logger.info("Wait for exam scene...")
# TODO: 等待考试开始
logger.info("Exam started")
# 循环打出推荐卡
no_card_count = 0
MAX_NO_CARD_COUNT = 3
while True:
count = skill_card_count()
if count == 0:
logger.info("No skill card found. Wait and retry...")
no_card_count += 1
if no_card_count >= MAX_NO_CARD_COUNT:
break
sleep(3)
continue
if not click_recommended_card(card_count=count):
break
sleep(9) # TODO: 采用更好的方式检测练习结束
# 点击“次へ”
device.click(image.expect_wait(R.InPurodyuusu.NextBtn))
while ocr.wait_for(contains("メモリー"), timeout=7):
device.click_center()
# 领取技能卡
acquire_skill_card()
def hajime_regular(week: int = -1, start_from: int = 0):
"""
Regular 模式
:param week: 第几周从1开始-1表示全部
"""
def week1():
"""
第一周 期中考试剩余5周\n
行动Vo.レッスンDa.レッスンVi.レッスン
"""
enter_recommended_action()
loading.wait_loading_start()
logger.info("Loading...")
loading.wait_loading_end()
logger.info("Loading end")
# 支援卡判断
practice()
def week2():
"""
第二周 期中考试剩余4周\n
行动授業学习
"""
logger.info("Regular week 2 started.")
# 点击“授業”
rect = image.expect_wait(R.InPurodyuusu.Action.ActionStudy).rect
device.click(rect)
sleep(0.5)
device.click(rect)
# 等待加载
loading.wait_loading_start()
logger.info("Loading...")
# 等待加载结束
loading.wait_loading_end()
logger.info("Loading end")
# 判断是否触发支援卡剧情
# TODO:检查是否有支援卡要领取的技能卡
# 等待加载
loading.wait_loading_start()
logger.info("Loading...")
# 等待加载结束
loading.wait_loading_end()
logger.info("Loading end")
# 进入授業页面
pos = image.expect_wait(R.InPurodyuusu.Action.VocalWhiteBg).rect
device.click(pos)
sleep(0.5)
device.click(pos)
# 选择选项
# TODO: 不固定点击 Vocal
device.double_click(image.expect_wait(R.InPurodyuusu.Action.VocalWhiteBg).rect)
# 领取技能卡
acquire_skill_card()
# 三次加载画面
loading.wait_loading_start()
logger.info("Loading 1...")
loading.wait_loading_end()
logger.info("Loading 1 end")
loading.wait_loading_start()
logger.info("Loading 2...")
loading.wait_loading_end()
logger.info("Loading 2 end")
loading.wait_loading_start()
logger.info("Loading 3...")
loading.wait_loading_end()
logger.info("Loading 3 end")
def week3():
"""
第三周 期中考试剩余3周\n
行动Vo.レッスンDa.レッスンVi.レッスン授業
"""
logger.info("Regular week 3 started.")
week1()
def week4():
"""
第四周 期中考试剩余2周\n
行动おでかけ相談活動支給
"""
logger.info("Regular week 4 started.")
week3()
def week5():
"""TODO"""
def week6():
"""期中考试"""
logger.info("Regular week 6 started.")
def week7():
"""第七周 期末考试剩余6周"""
logger.info("Regular week 7 started.")
if not enter_recommended_action():
rest()
def week8():
"""
第八周 期末考试剩余5周\n
行动授業活動支給
"""
logger.info("Regular week 8 started.")
if not enter_recommended_action():
rest()
def week_common():
if not enter_recommended_action():
rest()
else:
sleep(5)
until_practice_scene()
practice()
until_action_scene()
def week_final():
if not enter_recommended_action(final_week=True):
raise ValueError("Failed to enter recommended action on final week.")
sleep(5)
until_practice_scene()
practice()
# until_exam_scene()
weeks = [
week_common, # 1
week_common, # 2
week_common, # 3
week_common, # 4
week_final, # 5
exam, # 6
week_common, # 7
week_common, # 8
week_common, # 9
week_common, # 10
week_common, # 11
week_final, # 12
exam, # 13
]
if week != -1:
weeks[week - 1]()
else:
for w in weeks[start_from-1:]:
w()
def purodyuusu(
# TODO: 参数:成员、支援、记忆、 两个道具
):
# 流程:
# 1. Sensei 对话
# 2. Idol 对话
# 3. 领取P饮料
# 4. 触发支援卡事件。触发后必定需要领取物品
pass
__actions__ = [enter_recommended_action]
if __name__ == '__main__':
from logging import getLogger
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s')
getLogger('kotonebot').setLevel(logging.DEBUG)
getLogger(__name__).setLevel(logging.DEBUG)
# exam()
# enter_recommended_action()
# remaing_turns_and_points()
practice()
# action_end()
# acquire_pdorinku(0)
# image.wait_for(R.InPurodyuusu.InPractice.PDorinkuIcon)
# hajime_regular(start_from=5)
# until_practice_scene()
# device.click(image.expect_wait_any([
# R.InPurodyuusu.PSkillCardIconBlue,
# R.InPurodyuusu.PSkillCardIconColorful
# ]).rect)
# exam()
# device.double_click(image.expect_wait(R.InPurodyuusu.Action.VocalWhiteBg).rect)
# print(skill_card_count())
# click_recommended_card(card_count=skill_card_count())
# click_recommended_card(card_count=2)
# acquire_skill_card()
# rest()
# enter_recommended_lesson(final_week=True)

View File

@ -0,0 +1,42 @@
import time
from time import sleep
import cv2
import numpy as np
from kotonebot import image, device, debug
def loading() -> bool:
"""检测是否在场景加载页面"""
img = device.screenshot()
# 二值化图片
_, img = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY)
debug.show(img)
# 裁剪上面 10%
img = img[:int(img.shape[0] * 0.1), :]
debug.show(img)
# 判断图片中颜色数量是否 <= 2
# https://stackoverflow.com/questions/56606294/count-number-of-unique-colours-in-image
b,g,r = cv2.split(img)
shiftet_im = b.astype(np.int64) + 1000 * (g.astype(np.int64) + 1) + 1000 * 1000 * (r.astype(np.int64) + 1)
return len(np.unique(shiftet_im)) <= 2
def wait_loading_start(timeout: float = 10):
"""等待加载开始"""
start_time = time.time()
while not loading():
if time.time() - start_time > timeout:
raise TimeoutError('加载超时')
sleep(0.5)
def wait_loading_end(timeout: float = 10):
"""等待加载结束"""
start_time = time.time()
while loading():
if time.time() - start_time > timeout:
raise TimeoutError('加载超时')
sleep(0.5)
if __name__ == '__main__':
print(loading())
input()

View File

@ -0,0 +1,63 @@
from time import sleep
from logging import getLogger
from kotonebot import device, ocr, image, Rect
from .. import R
logger = getLogger(__name__)
# TODO: 加入一个 device.snapshot() 方法,用于保存当前设备画面,避免重复截图。
# TODO: 比较 OCR 和模板匹配文字的性能,如果模板匹配更好,
# 则首次使用 OCR找到结果后自动截图后续使用模板匹配。
def is_on_pdorinku_acquisition() -> bool:
"""检查是否在 P 饮料领取中"""
return ocr.find('受け取るPドリンクを選んでください。') is not None
def list_pdorinku() -> list[tuple[str, Rect]]:
"""
列出当前要领取的 P 饮料
:return: 检测结果`[(饮料名称, 饮料矩形坐标), ...]`
"""
# 截图所有饮料
# TODO: 自动记录未知饮料
dorinkus = image.find_crop(
R.InPurodyuusu.Action.PDorinkuBg,
mask=R.InPurodyuusu.Action.PDorinkuBgMask,
)
return [
('', dorinku.rect) # TODO: 获取饮料名称
for dorinku in dorinkus
]
def acquire_pdorinku(index: int):
"""
领取 P 饮料
:param index: 要领取的 P 饮料的索引 0 开始
"""
# TODO: 随机领取一个饮料改成根据具体情况确定最佳
# 点击饮料
drinks = list_pdorinku()
dorinku = drinks[index]
device.click(dorinku[1])
logger.debug(f"Pドリンク clicked: {dorinku[0]}")
sleep(0.3)
# 确定按钮
ret = ocr.expect('受け取る')
device.click(ret.rect)
logger.debug("受け取る clicked")
sleep(1.3)
# 再次确定
device.click_center()
logger.debug("再次确定 clicked")
__actions__ = [acquire_pdorinku]
if __name__ == '__main__':
from pprint import pprint as print
# print(list_pdorinku())
acquire_pdorinku(0)
input()

View File

@ -0,0 +1,7 @@
from kotonebot.backend.context import device
def start_game():
"""启动游戏"""
device.launch_app("com.bandainamcoent.idolmaster_gakuen")
__actions__ = [start_game]

View File

@ -0,0 +1,14 @@
from kotonebot.backend.util import TaskInfo
from kotonebot.backend.context import device
from kotonebot.tasks.actions.start_game import start_game
def purodyuusu(
):
__task__ = TaskInfo(
name="purodyuusu",
description="进行一次培养",
entry=purodyuusu
)

246
loop_detect fixed.py Normal file
View File

@ -0,0 +1,246 @@
import cv2
from cv2.typing import MatLike
import numpy as np
from client.device.adb import AdbDevice
from adbutils import adb
from typing import NamedTuple
# 初始化ADB设备
adb.connect("127.0.0.1:16384")
device = AdbDevice(adb.device_list()[0])
# 定义检测参数
TARGET_ASPECT_RATIO_RANGE = (0.73, 0.80)
TARGET_COLOR = (240, 240, 240)
YELLOW_LOWER = np.array([20, 100, 100])
YELLOW_UPPER = np.array([30, 255, 255])
GLOW_EXTENSION = 10 # 向外扩展的像素数
GLOW_THRESHOLD = 1200 # 荧光值阈值
import ctypes
user32 = ctypes.windll.user32
user32.SetProcessDPIAware()
screen = user32.GetSystemMetrics(0), user32.GetSystemMetrics(1)
class CardDetectResult(NamedTuple):
x: int
y: int
w: int
h: int
glow_value: int
is_target: bool
def show(title, image):
img_h, img_w = image.shape[:2]
scale_x = screen[0] * 0.8 / img_w
scale_y = screen[1] * 0.8 / img_h
if scale_x < 1 or scale_y < 1:
scale = min(scale_x, scale_y)
resized = cv2.resize(image, (0, 0), fx=scale, fy=scale)
else:
resized = image
cv2.imshow(title, resized)
# cv2.waitKey(0)
# cv2.destroyWindow(title)
# 添加固定的卡片坐标
# for: 1280x720
CARD_POSITIONS = [
(47, 883, 192, 252), # 左卡片 (x, y, w, h)
(264, 883, 192, 252), # 中卡片
(481, 883, 192, 252) # 右卡片
]
CARD_PAD = 25
CARD_SCREEN_PAD = 17
def calc_pos(count: int) -> list[tuple[int, int, int, int]]:
# 算法:根据 CARD_POSITIONS三张的情况
# 如果卡片数量过多导致无法保持原间距,则改为重叠布局
# 重叠时保持与屏幕两边间距为CARD_PAD
# 算出 count 张卡片的位置
# 如果只有一张卡片,直接返回中间位置
if count == 1:
middle_card = CARD_POSITIONS[1] # 取中间卡片位置
return [middle_card]
# 计算原始卡片间距
card_spacing = CARD_POSITIONS[1][0] - CARD_POSITIONS[0][0] # 相邻卡片x坐标之差
card_width = CARD_POSITIONS[0][2]
# 计算屏幕可用宽度(减去两边的padding)
screen_width = 720 # 使用最右卡片右边缘作为屏幕宽度
available_width = screen_width - (CARD_SCREEN_PAD * 2)
# 计算使用原始间距时的总宽度
original_total_width = (count - 1) * card_spacing + card_width
# 判断是否需要重叠布局
if original_total_width > available_width:
# 需要重叠布局
# 计算重叠距离 = (总宽度 - 可用宽度) / (卡片数量 - 1)
# overlap = (original_total_width - available_width) // (count - 1)
# spacing = card_width - overlap
spacing = (available_width - card_width * count - CARD_SCREEN_PAD * 2) // (count)
start_x = CARD_SCREEN_PAD
else:
# 使用原始间距,水平居中
spacing = card_spacing
start_x = (screen_width - original_total_width) // 2
# 生成所有卡片位置
positions = []
x = start_x
for i in range(count):
# y,w,h 保持不变,使用第一张卡的参数
y = CARD_POSITIONS[0][1]
w = CARD_POSITIONS[0][2]
h = CARD_POSITIONS[0][3]
positions.append((x, y, w, h))
x += spacing + card_width # 确保x是整数
# 四舍五入
positions = [(round(x), round(y), round(w), round(h)) for x, y, w, h in positions]
return positions
def detect_cards(image: MatLike, card_dimensions: list[tuple[int, int, int, int]]) -> list[CardDetectResult]:
card_contours = []
preview = image.copy()
# 圈出所有卡片预览
pv = image.copy()
# for x, y, w, h in CARD_POSITIONS:
# cv2.rectangle(pv, (x, y), (x+w, y+h), (0, 255, 0), 1)
# # 红色画出外围
# cv2.rectangle(pv, (x-GLOW_EXTENSION, y-GLOW_EXTENSION), (x+w+GLOW_EXTENSION, y+h+GLOW_EXTENSION), (0, 0, 255), 1)
# show("pv", pv)
for x, y, w, h in card_dimensions:
# 获取扩展后的卡片区域坐标
outer_x = max(0, x - GLOW_EXTENSION)
outer_y = max(0, y - GLOW_EXTENSION)
outer_w = w + (GLOW_EXTENSION * 2)
outer_h = h + (GLOW_EXTENSION * 2)
# 获取内外两个区域
outer_region = image[outer_y:y+h+GLOW_EXTENSION, outer_x:x+w+GLOW_EXTENSION]
inner_region = image[y:y+h, x:x+w]
# 创建掩码
outer_hsv = cv2.cvtColor(outer_region, cv2.COLOR_BGR2HSV)
inner_hsv = cv2.cvtColor(inner_region, cv2.COLOR_BGR2HSV)
# 计算外部区域的黄色部分
outer_mask = cv2.inRange(outer_hsv, YELLOW_LOWER, YELLOW_UPPER)
inner_mask = cv2.inRange(inner_hsv, YELLOW_LOWER, YELLOW_UPPER)
# 创建环形区域的掩码(仅计算扩展区域的荧光值)
ring_mask = outer_mask.copy()
ring_mask[GLOW_EXTENSION:GLOW_EXTENSION+h, GLOW_EXTENSION:GLOW_EXTENSION+w] = 0
# 计算环形区域的荧光值
glow_value = cv2.countNonZero(ring_mask)
card_contours.append(CardDetectResult(
x,
y,
w,
h,
glow_value,
glow_value > GLOW_THRESHOLD
))
# 在预览图像上画出内外区域
cv2.rectangle(preview, (outer_x, outer_y), (outer_x+outer_w, outer_y+outer_h), (0, 0, 255), 2)
cv2.rectangle(preview, (x, y), (x+w, y+h), (0, 255, 0), 2)
cv2.putText(preview, f"Glow: {glow_value}", (x, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, cv2.LINE_AA)
if glow_value > GLOW_THRESHOLD: # 假设阈值为200
cv2.putText(preview, "TargetCard", (x, y+20),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
show("cards", preview)
cv2.waitKey(1)
return card_contours
def main():
# while True:
# # 获取屏幕截图
# img = device.screenshot()
# # 检测卡片
# cards = detect_cards(img)
# # 如果检测到3个或更多卡片
# if len(cards) >= 3:
# print("检测到3个卡片")
# # 在图像上绘制检测结果
# for i, (x, y, w, h, glow) in enumerate(cards[:3]):
# cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
# cv2.putText(img, f"Card {i+1}", (x, y-10),
# cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# # 显示结果
# cv2.imshow("Detected Cards", img)
# # cv2.waitKey(0)
# # cv2.destroyAllWindows()
# # break
# # 等待1秒后继续检测
# cv2.waitKey(1000)
from kotonebot.client.device.fast_screenshot import AdbFastScreenshots
with AdbFastScreenshots(
adb_path=r"D:\SDK\Android\platform-tools\adb.exe",
device_serial="127.0.0.1:16384",
time_interval=179,
width=720,
height=1280,
bitrate="5M",
use_busybox=False,
connect_to_device=True,
screenshotbuffer=10,
go_idle=0,
) as adbscreen:
pos_tobe_clicked = None
pos_clicked_count = 0
for image in adbscreen:
if pos_tobe_clicked is not None:
pos_clicked_count += 1
if pos_clicked_count >= 2:
pos_tobe_clicked = None
pos_clicked_count = 0
continue
device.click(*pos_tobe_clicked)
# 获取屏幕截图
img = image
# 检测卡片
cards = detect_cards(img, CARD_POSITIONS)
# 如果检测到3个或更多卡片
if len(cards) >= 3:
# print("检测到3个卡片")
# 在图像上绘制检测结果
for i, card in enumerate(cards[:3]):
cv2.rectangle(img, (card.x, card.y), (card.x+card.w, card.y+card.h), (0, 255, 0), 2)
cv2.putText(img, f"Card {i+1}", (card.x, card.y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# 打印最大荧光值
print(f"最大荧光值: {max(card.glow_value for card in cards)}")
# 显示结果
# cv2.imshow("Detected Cards", img)
cv2.waitKey(1)
# 如果有则点击目标卡
if not pos_tobe_clicked and any(card.is_target for card in cards):
target_card = next(card for card in cards if card.is_target)
pos = (target_card.x + target_card.w // 2, target_card.y + target_card.h // 2)
print(f"点击位置: {pos}")
pos_tobe_clicked = pos
pos_clicked_count = 0
# TODO: 最终考试前练习不榨干体力
if __name__ == "__main__":
main()

121
loop_detect.py Normal file
View File

@ -0,0 +1,121 @@
import cv2
import numpy as np
from device.adb import AdbDevice
from adbutils import adb
# 初始化ADB设备
adb.connect("127.0.0.1:16384")
device = AdbDevice(adb.device_list()[0])
# 定义检测参数
TARGET_ASPECT_RATIO_RANGE = (0.73, 0.80)
TARGET_COLOR = (240, 240, 240)
YELLOW_LOWER = np.array([20, 100, 100])
YELLOW_UPPER = np.array([30, 255, 255])
import ctypes
user32 = ctypes.windll.user32
user32.SetProcessDPIAware()
screen = user32.GetSystemMetrics(0), user32.GetSystemMetrics(1)
def show(title, image):
img_h, img_w = image.shape[:2]
scale_x = screen[0] * 0.8 / img_w
scale_y = screen[1] * 0.8 / img_h
if scale_x < 1 or scale_y < 1:
scale = min(scale_x, scale_y)
resized = cv2.resize(image, (0, 0), fx=scale, fy=scale)
else:
resized = image
cv2.imshow(title, resized)
# cv2.waitKey(0)
# cv2.destroyWindow(title)
def detect_cards(image):
original = image.copy()
# 保存
cv2.imwrite("original.png", original)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
_, binary = cv2.threshold(gray, 190, 255, cv2.THRESH_BINARY)
edges = cv2.Canny(binary, 150, 300)
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
show("edges", edges)
card_contours = []
for i, contour in enumerate(contours):
area = cv2.contourArea(contour)
if area > 400:
peri = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * peri, True)
x, y, w, h = cv2.boundingRect(approx)
# 展示裁剪后的图像
card_region = image[y:y+h, x:x+w]
show(f"card_region", card_region)
# 检查长宽比
aspect_ratio = w / float(h)
if not (TARGET_ASPECT_RATIO_RANGE[0] < aspect_ratio < TARGET_ASPECT_RATIO_RANGE[1]):
continue
# 检查颜色
card_region = image[y:y+h, x:x+w]
bottom_right = card_region[-40:, -40:]
bottom_left = card_region[-40:, :40]
avg_color_br = np.mean(bottom_right, axis=(0, 1))
avg_color_bl = np.mean(bottom_left, axis=(0, 1))
if not (np.allclose(avg_color_br, TARGET_COLOR, atol=5) and
np.allclose(avg_color_bl, TARGET_COLOR, atol=5)):
continue
# 计算黄色荧光值
hsv = cv2.cvtColor(card_region, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv, YELLOW_LOWER, YELLOW_UPPER)
glow_value = cv2.countNonZero(mask)
card_contours.append((x, y, w, h, glow_value))
# 在原图上画出所有轮廓并展示
# 按顺序画出所有轮廓
preview = image.copy()
for i, (x, y, w, h, glow_value) in enumerate(card_contours):
cv2.rectangle(preview, (x, y), (x+w, y+h), (0, 255, 0), 2)
cv2.putText(preview, f"Card {i+1}", (x, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, cv2.LINE_AA)
show("cards", preview)
return card_contours
def main():
while True:
# 获取屏幕截图
img = device.screenshot()
# 检测卡片
cards = detect_cards(img)
# 如果检测到3个或更多卡片
if len(cards) >= 3:
print("检测到3个卡片")
# 在图像上绘制检测结果
for i, (x, y, w, h, glow) in enumerate(cards[:3]):
cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
cv2.putText(img, f"Card {i+1}", (x, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# 显示结果
cv2.imshow("Detected Cards", img)
cv2.waitKey(0)
cv2.destroyAllWindows()
break
# 等待1秒后继续检测
cv2.waitKey(1000)
if __name__ == "__main__":
main()

49
mask_temp.py Normal file
View File

@ -0,0 +1,49 @@
import cv2
import numpy as np
# 读取图像
template = cv2.imread('tests/images/pdorinku.png')
mask = cv2.imread('test_mask.png')
image = cv2.imread('tests/images/acquire_pdorinku.png')
# 打印大小
print(template.shape)
print(mask.shape)
print(image.shape)
# 将掩码二值化
# mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)[1]
# 反转
# 转换掩码为单通道灰度图
mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
# mask = cv2.bitwise_not(mask)
cv2.imshow('mask', mask)
# 展示 masked 模板
# 确保掩码和模板大小一致
# mask = cv2.resize(mask, (template.shape[1], template.shape[0]))
masked_template = cv2.bitwise_and(template, template, mask=mask)
cv2.imshow('masked_template', masked_template)
# 模板匹配
result = cv2.matchTemplate(image, template, cv2.TM_CCORR_NORMED, mask=mask)
cv2.imshow('result', result)
# 获取最佳匹配位置
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
# 获取模板尺寸
h, w = template.shape[:2]
# 在原图上绘制矩形标注结果
top_left = max_loc
bottom_right = (top_left[0] + w, top_left[1] + h)
cv2.rectangle(image, top_left, bottom_right, (0, 0, 255), 2)
# 显示结果
# 缩放 1/2
image = cv2.resize(image, (0, 0), fx=0.5, fy=0.5)
cv2.imshow('Result', image)
cv2.waitKey(0)
cv2.destroyAllWindows()

BIN
original.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

2
requirements.dev.txt Normal file
View File

@ -0,0 +1,2 @@
jinja2==3.1.5
pyinstaller==6.11.1

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
adbutils==2.8.0
opencv-python==4.10.0.84
rapidocr_onnxruntime==1.4.3
av==14.0.1
thefuzz==0.22.1
typing-extensions==4.12.2

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 818 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 838 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 405 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 973 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 762 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 818 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.6 KiB

BIN
test_images/1.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 520 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

BIN
test_mask.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 461 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 520 KiB

BIN
tests/images/pdorinku.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 405 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 980 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 948 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 965 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 454 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 297 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 602 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 647 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 544 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 575 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 713 KiB

View File

@ -0,0 +1,31 @@
import unittest
from kotonebot import _c
from kotonebot.tasks.actions.in_purodyuusu import skill_card_count
from util import MockDevice
class TestActionInProduce(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.d = MockDevice()
_c.inject_device(cls.d)
def test_current_skill_card_count(self):
cards_1 = 'tests/images/produce/in_produce_cards_1.png'
cards_2 = 'tests/images/produce/in_produce_cards_2.png'
cards_3 = 'tests/images/produce/in_produce_cards_3.png'
cards_4 = 'tests/images/produce/in_produce_cards_4.png'
cards_4_1 = 'tests/images/produce/in_produce_cards_4_1.png'
self.d.screenshot_path = cards_1
self.assertEqual(skill_card_count(), 1)
self.d.screenshot_path = cards_2
self.assertEqual(skill_card_count(), 2)
self.d.screenshot_path = cards_3
self.assertEqual(skill_card_count(), 3)
self.d.screenshot_path = cards_4
self.assertEqual(skill_card_count(), 4)
self.d.screenshot_path = cards_4_1
self.assertEqual(skill_card_count(), 4)

View File

@ -0,0 +1,30 @@
import unittest
from kotonebot.backend.context import _c
from kotonebot import device, debug
from util import *
debug.save_images = True
debug.save_images_dir = "tests/output_images"
class TestActionLoading(unittest.TestCase):
loadings = [f'tests/images/ui/loading_{i}.png' for i in range(1, 10)]
not_loadings = [f'tests/images/ui/not_loading_{i}.png' for i in range(1, 5)]
@classmethod
def setUpClass(cls):
cls.d = MockDevice('')
_c.inject_device(cls.d)
def test_loading(self):
for loading in self.loadings:
self.d.screenshot_path = loading
from kotonebot.tasks.actions import loading
self.assertTrue(loading.loading())
def test_not_loading(self):
for not_loading in self.not_loadings:
self.d.screenshot_path = not_loading
from kotonebot.tasks.actions import loading
self.assertFalse(loading.loading())

39
tests/test_backend.py Normal file
View File

@ -0,0 +1,39 @@
import unittest
import numpy as np
from kotonebot.backend.util import crop, crop_y, crop_x
class TestBackendUtils(unittest.TestCase):
def setUp(self):
# 创建一个10x10的测试图像
self.test_img = np.zeros((10, 10, 3), dtype=np.uint8)
def test_crop(self):
# 测试普通裁剪
result = crop(self.test_img, 0.2, 0.2, 0.8, 0.8)
self.assertEqual(result.shape, (6, 6, 3))
# 测试边界值
result = crop(self.test_img, 0, 0, 1, 1)
self.assertEqual(result.shape, (10, 10, 3))
# 测试最小裁剪
result = crop(self.test_img, 0.4, 0.4, 0.6, 0.6)
self.assertEqual(result.shape, (2, 2, 3))
def test_crop_y(self):
# 测试垂直裁剪
result = crop_y(self.test_img, 0.2, 0.8)
self.assertEqual(result.shape, (6, 10, 3))
# 测试边界值
result = crop_y(self.test_img, 0, 1)
self.assertEqual(result.shape, (10, 10, 3))
def test_crop_x(self):
# 测试水平裁剪
result = crop_x(self.test_img, 0.2, 0.8)
self.assertEqual(result.shape, (10, 6, 3))
# 测试边界值
result = crop_x(self.test_img, 0, 1)
self.assertEqual(result.shape, (10, 10, 3))

19
tests/test_ocr.py Normal file
View File

@ -0,0 +1,19 @@
import unittest
from kotonebot.backend.ocr import jp
import cv2
class TestOcr(unittest.TestCase):
def setUp(self):
self.img = cv2.imread('test_images/acquire_pdorinku.png')
def test_ocr_ocr(self):
result = jp.ocr(self.img)
self.assertGreater(len(result), 0)
def test_ocr_find(self):
self.assertTrue(jp.find(self.img, '中間まで'))
self.assertTrue(jp.find(self.img, '受け取るPドリンクを選んでください。'))
self.assertTrue(jp.find(self.img, '受け取る'))

View File

@ -0,0 +1,70 @@
import unittest
import cv2
from kotonebot.backend.image import template_match, find_crop
def save(image, name: str):
import os
if not os.path.exists('./tests/output_images'):
os.makedirs('./tests/output_images')
cv2.imwrite(f'./tests/output_images/{name}.png', image)
class TestTemplateMatch(unittest.TestCase):
def setUp(self):
self.template = cv2.imread('tests/images/pdorinku.png')
self.mask = cv2.imread('tests/images/pdorinku_mask.png')
self.image = cv2.imread('tests/images/acquire_pdorinku.png')
def __assert_pos(self, result, x, y, offset=10):
self.assertGreater(result.position[0], x - offset)
self.assertGreater(result.position[1], y - offset)
self.assertLess(result.position[0], x + offset)
self.assertLess(result.position[1], y + offset)
def test_basic(self):
result = template_match(self.template, self.image)
# 圈出结果并保存
cv2.rectangle(self.image, result[0].rect, (0, 0, 255), 2)
save(self.image, 'TestTemplateMatch.basic')
self.assertGreater(len(result), 0)
self.assertGreater(result[0].score, 0.9)
# 坐标位于 (167, 829) 附近
self.__assert_pos(result[0], 167, 829)
def test_masked(self):
result = template_match(
self.template,
self.image,
mask=self.mask,
max_results=3,
remove_duplicate=False,
threshold=0.999,
)
# 圈出结果并保存
for i, r in enumerate(result):
cv2.rectangle(self.image, r.rect, (0, 0, 255), 2)
save(self.image, 'TestTemplateMatch.masked')
self.assertEqual(len(result), 3)
self.assertGreater(result[0].score, 0.9)
self.assertGreater(result[1].score, 0.9)
self.assertGreater(result[2].score, 0.9)
# 坐标位于 (167, 829) 附近
self.__assert_pos(result[0], 167, 829)
self.__assert_pos(result[1], 306, 829)
self.__assert_pos(result[2], 444, 829)
def test_crop(self):
result = find_crop(
self.image,
self.template,
self.mask,
threshold=0.999,
)
for i, r in enumerate(result):
cv2.imwrite(f'./tests/output_images/TestTemplateMatch.crop_{i}.png', r.image)
self.assertEqual(len(result), 3)

50
tests/util.py Normal file
View File

@ -0,0 +1,50 @@
from typing import Sequence, overload
from typing_extensions import override
import cv2
from cv2.typing import MatLike
from kotonebot.backend.util import Rect
from kotonebot.client.protocol import DeviceProtocol
class MockDevice(DeviceProtocol):
def __init__(
self,
screenshot_path: str = '',
):
self.screenshot_path = screenshot_path
self.screenshot_hook = None
@override
def screenshot(self) -> MatLike:
img = cv2.imread(self.screenshot_path)
if self.screenshot_hook is not None:
img = self.screenshot_hook(img)
return img
@staticmethod
def list_devices() -> list[str]:
raise NotImplementedError
def launch_app(self, package_name: str) -> None:
raise NotImplementedError
@overload
def click(self, x: int, y: int) -> None:
...
@overload
def click(self, rect: Sequence[int]) -> None:
...
def click(self, *args, **kwargs):
raise NotImplementedError
def swipe(self, x1: int, y1: int, x2: int, y2: int) -> None:
raise NotImplementedError
@property
def screen_size(self) -> tuple[int, int]:
raise NotImplementedError

39
tools/R.jinja2 Normal file
View File

@ -0,0 +1,39 @@
####### 图片资源文件 #######
####### 此文件为自动生成,请勿编辑 #######
####### AUTO GENERATED. DO NOT EDIT. #######
import os
{% for lang in data -%}
{%- for class_name, attrs in lang.resources.items() -%}
class {{ class_name }}:
{%- for attr in attrs.values() %}
# {{ attr }}
{% if attr.type == 'image' -%}
{{ attr.name }} = {{ attr.value }}
"""
路径:{{ attr.rel_path }}<br>
模块:`{{ attr.class_path|join('.') }}.{{ attr.name }}`<br>
<img src="vscode-file://vscode-app/{{ attr.abspath }}" style="max-width: 200px;">
"""
{%- elif attr.type == 'next_class' -%}
{{ attr.name }} = {{ attr.value }}
{%- endif -%}
{% endfor %}
pass
{% endfor %}
{# class {{ lang.language }}:
# {% for attr in lang.attrs %}
# {{ attr.name }} = os.path.abspath("{{ attr.path }}")
# """
# 路径:`{{ attr.path }}`<br>
# 模块:`{{ attr.class_path|join('.') }}.{{ attr.name }}`<br>
# <img src="vscode-file://vscode-app/{{ attr.abspath }}" style="max-width: 200px;">
# """
# {% endfor %}
#}
{% endfor %}

Some files were not shown because too many files have changed in this diff Show More