feat(*): 重构并优化了部分培育流程 & 修复了一些 bug

1. 修复 HintBox 与原有 Rect 类型并不兼容的问题
2. 新增 SimpleDispatcher 类(试验性)
3. 移除 OCR 识别前的灰度处理,pad 填充支持灰度图
4. OcrResult 新增 replace() 方法,用于快速替换文本
5. 新增 Countdown 类,用于在 Dispatcher 循环中倒计时
6. 优化截图继承方式
7. Context 类新增 inject() 方法,新增支持视频输入的 MockDevice 类,便于测试
8. 调整 device.double_click() 默认间隔
9. 优化培育中奖励领取相关逻辑(acquisitions 等相关函数),提高检测处理速度
10. 重构培育流程,合并 week_lesson 与 week_non_lesson 处理逻辑为 week_normal
11. 新增基本 PRO 培育支持
12. 重构练习/考试中推荐卡的检测,大幅提高了期末考试中的检测准确度。
13. 重构练习/考试逻辑,移除硬编码的等待下一回合 sleep
This commit is contained in:
XcantloadX 2025-02-11 12:31:47 +08:00
parent 22f09ef8ef
commit 923e3b8af2
36 changed files with 1372 additions and 1407 deletions

53
experiments/1.py Normal file
View File

@ -0,0 +1,53 @@
from typing import Callable, ParamSpec, Generic, TypeVar
P = ParamSpec('P')
R = TypeVar('R')
class LogCallable(Generic[P, R]):
def __init__(self, func: Callable[P, R]):
self.func = func
self.log = None
def use(self, *, log: str):
self.log = log
return self
def __reset(self):
self.log = None
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
if self.log is not None:
print(self.log)
ret = self.func(*args, **kwargs)
self.__reset()
return ret
def log(func: Callable[P, None]):
return LogCallable(func)
@log
def add(a: int, b: int):
print(a + b)
# with log
add.use(log='adding 1 and 2')(1, 2)
# without log
add(1, 2)
P = ParamSpec("P")
R = TypeVar("R")
def wrap_with_id(fn: Callable[P, R]):
def wrapper(id: str, *args: P.args, **kwargs: P.kwargs) -> R:
return fn(*args, **kwargs)
return wrapper
@wrap_with_id
def func(x: int, y: int) -> str:
...
func

300
experiments/card.py Normal file
View File

@ -0,0 +1,300 @@
from time import sleep
import cv2
from cv2.typing import MatLike
import numpy as np
from typing import NamedTuple
from kotonebot.client.device.fast_screenshot import AdbFastScreenshots
def cv_imread(filePath):
cv_img=cv2.imdecode(np.fromfile(filePath,dtype=np.uint8),-1)
## imdecode读取的是rgb如果后续需要opencv处理的话需要转换成bgr转换后图片颜色会变化
##cv_img=cv2.cvtColor(cv_img,cv2.COLOR_RGB2BGR)
return cv_img
def cv_imshow(name, img, overlay_msg: str = ''):
scale = 0.5
if overlay_msg:
cv2.putText(img, overlay_msg, (10, img.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow(name, cv2.resize(img, (int(img.shape[1] * scale), int(img.shape[0] * scale))))
def process(img, name):
original_img = img.copy()
# 转换为HSV颜色空间
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# 目标HSV值: H=51°, S=46%, V=98%
# OpenCV中H范围是0-180,S和V是0-255
# 所以需要转换:
# H: 51° -> 28.33 (51/360*180)
# S: 46% -> 117 (46/100*255)
# V: 98% -> 250 (98/100*255)
# upper_h = int(60/360*180)
# upper_s = int(100/100*255)
# upper_v = int(100/100*255)
# lower_h = int(45/360*180)
# lower_s = int(60/100*255)
# lower_v = int(60/100*255)
lower_h, lower_s, lower_v = 20, 100, 100
upper_h, upper_s, upper_v = 30, 255, 255
# 创建掩码
lower = np.array([lower_h, lower_s, lower_v])
upper = np.array([upper_h, upper_s, upper_v])
mask = cv2.inRange(hsv, lower, upper)
# 应用掩码
result = cv2.bitwise_and(img, img, mask=mask)
# 显示结果
# 水平拼接两张图片
combined = np.hstack((img, result))
return result, combined
# images = [
# cv_imread(r"D:\1.png"),
# cv_imread(r"D:\2.png"),
# cv_imread(r"D:\3.png"),
# cv_imread(r"D:\4.png"),
# ]
# for i, img in enumerate(images):
# process(img, f'{i}')
# cv2.waitKey(0)
# cv2.destroyAllWindows()
def stream_mp4(video: str, fps: int):
cap = cv2.VideoCapture(video)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
sleep(1 / fps)
yield frame
def stream_device():
while True:
img = device.screenshot()
yield img
def fast_screenshot():
with AdbFastScreenshots(
adb_path=r"D:\SDK\Android\platform-tools\adb.exe",
device_serial="127.0.0.1:5555",
time_interval=179,
width=720,
height=1280,
bitrate="10M",
use_busybox=False,
connect_to_device=True,
screenshotbuffer=10,
go_idle=0,
) as adbscreen:
for image in adbscreen:
yield image
# Converts BGR to CMYK (as a tuple of 4 arrays)
def bgr2cmky(bgrImage):
bgrdash = bgrImage.astype(float) / 255.
# Calculate K as (1 - whatever is biggest out of Rdash, Gdash, Bdash)
K = 1 - np.max(bgrdash, axis=2)
# Calculate C
C = (1 - bgrdash[..., 2] - K) / (1 - K)
C = 255 * C
C = C .astype(np.uint8)
# Calculate M
M = (1 - bgrdash[..., 1] - K) / (1 - K)
M = 255 * M
M = M.astype(np.uint8)
# Calculate Y
Y = (1 - bgrdash[..., 0] - K) / (1 - K)
Y = 255 * Y
Y = Y.astype(np.uint8)
return (C, M, Y, K)
class CardDetectResult(NamedTuple):
type: int
score: float
left_score: float
right_score: float
top_score: float
bottom_score: float
def calc(image: MatLike, card_count: int):
YELLOW_LOWER = np.array([15, 80, 80])
YELLOW_UPPER = np.array([35, 255, 255])
CARD_POSITIONS_1 = [
# 格式:(x, y, w, h, return_value)
(264, 883, 192, 252, 0)
]
CARD_POSITIONS_2 = [
(156, 883, 192, 252, 1),
(372, 883, 192, 252, 2),
# delta_x = 216, delta_x-width = 24
]
CARD_POSITIONS_3 = [
(47, 883, 192, 252, 0), # 左卡片 (x, y, w, h)
(264, 883, 192, 252, 1), # 中卡片
(481, 883, 192, 252, 2) # 右卡片
# delta_x = 217, delta_x-width = 25
]
CARD_POSITIONS_4 = [
(17, 883, 192, 252, 0),
(182, 883, 192, 252, 1),
(346, 883, 192, 252, 2),
(511, 883, 192, 252, 3),
# delta_x = 165, delta_x-width = -27
]
SKIP_POSITION = (621, 739, 85, 85, 10)
GLOW_EXTENSION = 15
if card_count == 1:
cards = CARD_POSITIONS_1
elif card_count == 2:
cards = CARD_POSITIONS_2
elif card_count == 3:
cards = CARD_POSITIONS_3
elif card_count == 4:
cards = CARD_POSITIONS_4
else:
raise ValueError(f"Unsupported card count: {card_count}")
cards.append(SKIP_POSITION)
results = []
for ix, (x, y, w, h, return_value) in enumerate(cards):
outer = (max(0, x - GLOW_EXTENSION), max(0, y - GLOW_EXTENSION))
inner = (x + w, y + h)
outer_pixels = (outer[0] - inner[0]) * (outer[1] - inner[1])
inner_pixels = w * h
# 裁剪出检测区域
glow_area = image[outer[1]:y + h + GLOW_EXTENSION, outer[0]:x + w + GLOW_EXTENSION]
area_h = glow_area.shape[0]
area_w = glow_area.shape[1]
glow_area[GLOW_EXTENSION:area_h-GLOW_EXTENSION, GLOW_EXTENSION:area_w-GLOW_EXTENSION] = 0
_glow = glow_area.copy()
# 计算黄色值
glow_area = cv2.cvtColor(glow_area, cv2.COLOR_BGR2HSV)
yellow_mask = cv2.inRange(glow_area, YELLOW_LOWER, YELLOW_UPPER)
# 排除 #edf37b (237, 243, 123) BGR
# 在 HSV 空间中大约是 (63, 147, 243)
# (57, 80%, 98%) -> (57, 204, 250)
exclude_mask = cv2.inRange(glow_area, np.array([50, 100, 100]), np.array([60, 220, 255]))
yellow_mask = cv2.bitwise_and(yellow_mask, cv2.bitwise_not(exclude_mask))
_masked = cv2.bitwise_and(_glow, _glow, mask=yellow_mask)
cv_imshow(str(return_value), np.hstack((_glow, _masked)))
# cv_imshow("detect_area " + str(ix), yellow_mask)
left_border = yellow_mask[:, 0:GLOW_EXTENSION]
right_border = yellow_mask[:, area_w-GLOW_EXTENSION:area_w]
top_border = yellow_mask[0:GLOW_EXTENSION, :]
bottom_border = yellow_mask[area_h-GLOW_EXTENSION:area_h, :]
y_border_pixels = area_h * GLOW_EXTENSION
x_border_pixels = area_w * GLOW_EXTENSION
left_score = np.count_nonzero(left_border) / y_border_pixels
right_score = np.count_nonzero(right_border) / y_border_pixels
top_score = np.count_nonzero(top_border) / x_border_pixels
bottom_score = np.count_nonzero(bottom_border) / x_border_pixels
result = (left_score + right_score + top_score + bottom_score) / 4
results.append(CardDetectResult(
return_value,
result,
left_score,
right_score,
top_score,
bottom_score
))
results.sort(key=lambda x: x.score, reverse=True)
def print_result(result):
# round to 3 decimal places
new_results = []
for result in results:
result = tuple(round(x, 3) for x in result)
new_results.append(result)
print(new_results)
# print_result(results)
if results[0].score > 0.1 and (
results[0].left_score > 0.01 and results[0].right_score > 0.01 and
results[0].top_score > 0.01 and results[0].bottom_score > 0.01
):
print_result(results)
# if results[0].type != 1:
# while True:
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
from kotonebot.backend.debug.mock import MockDevice
from kotonebot.backend.context import device, init_context, manual_context, inject_context
from kotonebot.tasks.actions.in_purodyuusu import handle_recommended_card, skill_card_count
from kotonebot.backend.util import Profiler
init_context()
mock = MockDevice()
vid = mock.load_video(r'D:\end.mp4', 60)
# inject_context(device=mock)
ctx = manual_context()
ctx.begin()
result = None
profiler = Profiler(file_path='profiler')
profiler.begin()
# sd = wait_card_stable2()
for img in stream_device():
new_result, combined = process(img, 'screenshot')
# 将结果累加
if result is not None:
result = cv2.add(result, new_result)
else:
result = new_result
overlay_msg = ''
# overlay_msg += f'skill_card_count: {skill_card_count()}'
# stable = next(sd, None)
# print(stable)
# overlay_msg += f'wait_card_stable2: {stable}'
# if stable == True:
# sd = wait_card_stable2()
# cv_imshow('result', result)
cv_imshow('combined', combined, overlay_msg)
# (C, M, Y, K) = bgr2cmky(img)
# Show K:
# cv_imshow("Y", Y)
calc(img, 3)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
profiler.end()
cv2.destroyAllWindows()
profiler.snakeviz()
ctx.end()

View File

@ -1,125 +0,0 @@
import cv2
import numpy as np
def detect_glowing_card(image_path):
# 读取图像
img = cv2.imread(image_path)
# 转换到HSV色彩空间
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# 1. 首先检测卡片轮廓
# 转换成灰度图
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 使用高斯模糊减少噪声
blurred = cv2.GaussianBlur(gray, (7, 7), 0)
# 使用Canny边缘检测
edges = cv2.Canny(blurred, 50, 150)
# 膨胀边缘使轮廓更明显
dilated = cv2.dilate(edges, None, iterations=2)
cv2.imshow('Dilated Edges', dilated)
cv2.waitKey(0)
# 查找轮廓
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 筛选可能的卡片轮廓
card_contours = []
min_card_area = 5000 # 最小卡片面积
max_card_area = 50000 # 最大卡片面积
for contour in contours:
area = cv2.contourArea(contour)
if min_card_area < area < max_card_area:
# 计算轮廓的主要特征
peri = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * peri, True)
# 计算最小外接矩形
rect = cv2.minAreaRect(contour)
box = cv2.boxPoints(rect)
box = np.int0(box)
# 计算轮廓的形状特征
aspect_ratio = rect[1][0] / rect[1][1] if rect[1][1] != 0 else 0
# 检查是否符合卡片特征
if 0.5 < aspect_ratio < 2.0: # 合理的宽高比
card_contours.append(contour)
# 2. 创建卡片掩码
card_mask = np.zeros_like(gray)
cv2.drawContours(card_mask, card_contours, -1, (255), -1)
# 3. 检测黄色发光
# 定义黄色的HSV范围
lower_yellow = np.array([20, 100, 100])
upper_yellow = np.array([30, 255, 255])
# 创建黄色掩码
yellow_mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 4. 结合卡片掩码和黄色掩码
final_mask = cv2.bitwise_and(yellow_mask, card_mask)
# 5. 分析每张卡片
glow_scores = []
card_regions = []
for contour in card_contours:
# 获取卡片边界框
x, y, w, h = cv2.boundingRect(contour)
card_regions.append((x, y, x+w, y+h))
# 计算该区域内的发光得分
region_mask = final_mask[y:y+h, x:x+w]
score = np.sum(region_mask > 0)
glow_scores.append(score)
# 6. 找出发光卡片
if glow_scores:
glowing_card_index = np.argmax(glow_scores)
# 在原图上标记结果
result = img.copy()
for i, (x1, y1, x2, y2) in enumerate(card_regions):
color = (0, 255, 0) if i == glowing_card_index else (0, 0, 255)
cv2.rectangle(result, (x1, y1), (x2, y2), color, 2)
return {
'glowing_card_index': glowing_card_index,
'glow_scores': glow_scores,
'result_image': result,
'card_mask': card_mask,
'yellow_mask': yellow_mask,
'final_mask': final_mask
}
else:
return None
def display_results(results):
if results is None:
print("未检测到卡片")
return
# 显示所有处理步骤的结果
cv2.imshow('Original with Detection', results['result_image'])
cv2.imshow('Card Mask', results['card_mask'])
cv2.imshow('Yellow Mask', results['yellow_mask'])
cv2.imshow('Final Mask', results['final_mask'])
print(f"发光卡片序号: {results['glowing_card_index']}")
print(f"各卡片发光得分: {results['glow_scores']}")
cv2.waitKey(0)
cv2.destroyAllWindows()
def main():
image_path = r"C:\Users\user\Downloads\Snipaste_2024-12-26_10-11-58.png" # 替换为实际图像路径
results = detect_glowing_card(image_path)
display_results(results)
if __name__ == '__main__':
main()

View File

@ -1,216 +0,0 @@
import cv2
import numpy as np
# 使用其他 API 获取屏幕尺寸
import ctypes
user32 = ctypes.windll.user32
user32.SetProcessDPIAware()
screen = user32.GetSystemMetrics(0), user32.GetSystemMetrics(1)
ignore_show = False
def show(title, image):
img_h, img_w = image.shape[:2]
scale_x = screen[0] * 0.9 / img_w
scale_y = screen[1] * 0.9 / img_h
if scale_x < 1 or scale_y < 1:
scale = min(scale_x, scale_y)
resized = cv2.resize(image, (0, 0), fx=scale, fy=scale)
else:
resized = image
if not ignore_show:
cv2.imshow(title, resized)
cv2.waitKey(0)
cv2.destroyWindow(title)
# 读取图像
# image = cv2.imread(r"C:\Users\user\Downloads\1735195113729.jpg")
# image = cv2.imread(r"./test_images/1.jpg")
image = cv2.imread(r"./test_images/test1.png")
original = image.copy()
# 转换为灰度图像,并模糊以减少噪声
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# _, binary = cv2.threshold(original, 150, 255, cv2.THRESH_BINARY)
_, binary = cv2.threshold(gray, 190, 255, cv2.THRESH_BINARY)
kernel = np.ones((2, 2),np.uint8) # 定义卷积核大小,可以根据实际情况调整
erosion = cv2.erode(binary, kernel,iterations = 7)
dilation = cv2.dilate(binary, kernel,iterations = 10) # 膨胀操作
# closing = cv2.morphologyEx(dilation, cv2.MORPH_CLOSE, kernel) # 闭运算
# 边缘检测
edges = cv2.Canny(binary, 150, 300)
show("Binary", binary)
# show("erosion", erosion)
# show("dilation", dilation)
show("Edges", edges)
cv2.waitKey(0)
# 找到所有轮廓
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 封闭轮廓
# closed_contours = []
# for contour in contours:
# epsilon = 0.01 * cv2.arcLength(contour, True)
# approx = cv2.approxPolyDP(contour, epsilon, True)
# closed_contours.append(approx)
# 按轮廓长度排序只保留前10个
# contours = sorted(contours, key=lambda c: cv2.arcLength(c, False), reverse=True)[:10]
# # # 只保留直线轮廓
# filtered_contours = []
# for contour in contours:
# # 使用最小二乘法拟合直线
# [vx, vy, x, y] = cv2.fitLine(contour, cv2.DIST_L2, 0, 0.01, 0.01)
# # 计算轮廓的角度
# angle = np.arctan2(vy, vx) * 180 / np.pi
# # 如果角度接近水平或垂直,则保留该轮廓
# if abs(angle) < 30 or abs(angle - 90) < 30:
# filtered_contours.append(contour)
# contours = filtered_contours
# # 移除长度过短的轮廓
# min_contour_length = 150 # 设置最小轮廓长度阈值
# filtered_contours = [contour for contour in contours if cv2.arcLength(contour, True) > min_contour_length]
# contours = filtered_contours
# 绘制所有封闭轮廓
cv2.drawContours(original, contours, -1, (0, 255, 0), 2)
show("Contours", original)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 创建一个空白图像用于绘制轮廓
contour_image = np.zeros_like(image)
# 绘制所有轮廓
cv2.drawContours(contour_image, contours, -1, (255, 255, 255), 1)
# 显示只有轮廓的图像
show("Contours Only", contour_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 用于存储每张卡片的荧光值
card_glow_values = []
card_contours = []
# 遍历轮廓,筛选出卡片区域
for contour in contours:
area = cv2.contourArea(contour)
if area > 800: # 忽略小区域
# 近似轮廓形状
peri = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * peri, True)
# 计算外接矩形
x, y, w, h = cv2.boundingRect(approx)
card_region = image[y:y+h, x:x+w]
# 绘制并展示该区域
# cv2.rectangle(original, (x, y), (x + w, y + h), (0, 0, 255), 2)
# show("Card Region pre", card_region)
# cv2.waitKey(0)
# 一起展示轮廓
# Create a tight black background for the contour
x, y, w, h = cv2.boundingRect(approx)
padding = 50
contour_img = np.zeros((h + padding*2, w + padding*2, 3), dtype=np.uint8)
# Adjust contour coordinates for padding
shifted_contour = approx.copy()
shifted_contour[:,:,0] = approx[:,:,0] - x + padding
shifted_contour[:,:,1] = approx[:,:,1] - y + padding
# Draw contour on black background
# cv2.drawContours(contour_img, [shifted_contour], -1, (0, 255, 0), 2)
# show("Contour on Black", contour_img)
# cv2.waitKey(0)
# 条件 1满足长宽比
aspect_ratio = w / float(h)
TARGET_ASPECT_RATIO_RANGE = (0.73, 0.80)
if not (TARGET_ASPECT_RATIO_RANGE[0] < aspect_ratio < TARGET_ASPECT_RATIO_RANGE[1]):
continue
# 条件 3颜色要求
# 提取区域右下角与左下角(40, 40)正方形区域的平均颜色
bottom_right = card_region[-40:, -40:]
bottom_left = card_region[-40:, :40]
avg_color_br = np.mean(bottom_right, axis=(0, 1))
avg_color_bl = np.mean(bottom_left, axis=(0, 1))
# 检查是否都近似 #f0f0f0
TARGET_COLOR = (240, 240, 240)
# 绘制并展示该区域
cv2.rectangle(original, (x, y), (x + w, y + h), (0, 0, 255), 2)
# 把颜色画上去
# 绘制并展示该区域
cv2.rectangle(original, (x, y), (x + w, y + h), (0, 0, 255), 2)
show("Card Region pre", card_region)
# preview = cv2.putText(original.copy(), str(avg_color_br), (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
# show("Card Region", preview)
# cv2.waitKey(0)
if not (
np.allclose(avg_color_br, TARGET_COLOR, atol=5)
and np.allclose(avg_color_bl, TARGET_COLOR, atol=5)
):
continue
if TARGET_ASPECT_RATIO_RANGE[0] < aspect_ratio < TARGET_ASPECT_RATIO_RANGE[1]:
# 提取卡片区域
card_region = image[y:y+h, x:x+w]
# 转换为 HSV 色彩空间
hsv = cv2.cvtColor(card_region, cv2.COLOR_BGR2HSV)
# 定义黄色的 HSV 阈值
lower_yellow = np.array([20, 100, 100])
upper_yellow = np.array([30, 255, 255])
# 创建遮罩,提取黄色区域
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 计算荧光值(黄色像素的总数)
glow_value = cv2.countNonZero(mask)
# 保存卡片轮廓和对应的荧光值
card_glow_values.append(glow_value)
card_contours.append((x, y, w, h))
# 绘制筛选后的边缘
for contour in card_contours:
x, y, w, h = contour
cv2.rectangle(original, (x, y), (x + w, y + h), (255, 0, 0), 5)
# 找到荧光值最高的卡片
if card_glow_values:
max_glow_index = np.argmax(card_glow_values)
max_glow_card = card_contours[max_glow_index]
# 绘制荧光值最高的卡片轮廓
x, y, w, h = max_glow_card
cv2.rectangle(original, (x, y), (x + w, y + h), (0, 255, 0), 5)
cv2.putText(original, "Max Glow", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# 显示结果
show("Detected Cards", original)
cv2.waitKey(0)
cv2.destroyAllWindows()

View File

@ -1,67 +0,0 @@
import cv2
import numpy as np
# 读取图像
# image = cv2.imread(r"C:\Users\user\Downloads\Snipaste_2024-12-26_10-11-58.png")
# image = cv2.imread(r"C:\Users\user\Downloads\1735194517471.jpg")
image = cv2.imread(r"C:\Users\user\Downloads\1735195113729.jpg")
original = image.copy()
# 转换为 HSV 色彩空间
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 定义黄色的 HSV 阈值
lower_yellow = np.array([20, 100, 100])
upper_yellow = np.array([60, 255, 255])
# 使用遮罩作为二值图像
binary = cv2.threshold(original, 220, 255, cv2.THRESH_BINARY)[1]
# 创建遮罩,提取黄色区域
mask = cv2.inRange(cv2.cvtColor(binary, cv2.COLOR_BGR2HSV), lower_yellow, upper_yellow)
# 获取图像高度和宽度
height, width = image.shape[:2]
# 计算每个区域的宽度
region_width = width // 3
# 创建三个区域的遮罩
regions = []
yellow_pixels = []
for i in range(3):
start_x = i * region_width
end_x = (i + 1) * region_width
region_mask = mask[:, start_x:end_x]
regions.append(region_mask)
yellow_pixels.append(cv2.countNonZero(region_mask))
# 找出黄色像素最多的区域
max_yellow_region = yellow_pixels.index(max(yellow_pixels))
# 截取对应区域的原图
start_x = max_yellow_region * region_width
end_x = (max_yellow_region + 1) * region_width
cropped_image = original[:, start_x:end_x]
# 显示含黄色最多的区域
cv2.imshow("Region with most yellow", cropped_image)
# 打印每个区域的黄色像素数量
for i, count in enumerate(yellow_pixels):
print(f"Region {i + 1}: {count} yellow pixels")
# 判断是否存在突出区域
max_count = max(yellow_pixels)
avg_count = sum(yellow_pixels) / len(yellow_pixels)
if max_count > avg_count * 1.5: # 如果最大值超过平均值的1.5倍
print(f"Region {yellow_pixels.index(max_count) + 1} has significantly more yellow pixels")
else:
print("No region has significantly more yellow pixels")
# 显示结果
cv2.imshow("Yellow Mask", mask)
cv2.imshow("Binary", binary)
cv2.waitKey(0)

View File

@ -1,29 +0,0 @@
from thefuzz import fuzz
import time
import random
import string
def generate_random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
# 生成测试数据
count = 100000
test_strings = [(generate_random_string(10), generate_random_string(10)) for _ in range(count)]
# 测试普通字符串比较
start_time = time.time()
for s1, s2 in test_strings:
a = (s1 == s2)
str_compare_time = time.time() - start_time
# 测试 fuzz.ratio
start_time = time.time()
for s1, s2 in test_strings:
fuzz.ratio(s1, s2)
fuzz_time = time.time() - start_time
print(f"字符串比较耗时: {str_compare_time:.4f}")
print(f"fuzz.ratio耗时: {fuzz_time:.4f}")
print(f"fuzz.ratio比字符串比较慢 {fuzz_time/str_compare_time:.1f}")
print(fuzz.ratio("Da.レッスン", "Daレッスン"))

View File

@ -1,44 +0,0 @@
import cv2
import time
import hashlib
import numpy as np
def test_image_hash_time(iterations):
# 创建一个1920x1080的随机图像
img = np.random.randint(0, 256, (1080, 1920, 3), dtype=np.uint8)
total_time = 0
for i in range(iterations):
start_time = time.time()
# 计算图像的MD5
md5_hash = hashlib.md5(img.tobytes()).hexdigest()
end_time = time.time()
total_time += (end_time - start_time)
avg_time = total_time / iterations
print(f"平均计算时间: {avg_time:.6f}")
print(f"最后一次计算的MD5值: {md5_hash}")
def test_uuid_time(iterations):
import uuid
total_time = 0
for i in range(iterations):
start_time = time.time()
# 生成 UUID
uuid_str = str(uuid.uuid4())
end_time = time.time()
total_time += (end_time - start_time)
avg_time = total_time / iterations
print(f"平均生成UUID时间: {avg_time:.6f}")
print(f"最后一次生成的UUID: {uuid_str}")
if __name__ == "__main__":
test_image_hash_time(1000)
test_uuid_time(1000)

View File

@ -1,254 +0,0 @@
import cv2
from cv2.typing import MatLike
import numpy as np
from kotonebot.client.device.adb import AdbDevice
from adbutils import adb
from typing import NamedTuple
# 定义检测参数
TARGET_ASPECT_RATIO_RANGE = (0.73, 0.80)
TARGET_COLOR = (240, 240, 240)
YELLOW_LOWER = np.array([20, 100, 100])
YELLOW_UPPER = np.array([30, 255, 255])
GLOW_EXTENSION = 10 # 向外扩展的像素数
GLOW_THRESHOLD = 1200 # 荧光值阈值
import ctypes
user32 = ctypes.windll.user32
user32.SetProcessDPIAware()
screen = user32.GetSystemMetrics(0), user32.GetSystemMetrics(1)
class CardDetectResult(NamedTuple):
x: int
y: int
w: int
h: int
glow_value: int
is_target: bool
def show(title, image):
img_h, img_w = image.shape[:2]
scale_x = screen[0] * 0.8 / img_w
scale_y = screen[1] * 0.8 / img_h
if scale_x < 1 or scale_y < 1:
scale = min(scale_x, scale_y)
resized = cv2.resize(image, (0, 0), fx=scale, fy=scale)
else:
resized = image
cv2.imshow(title, resized)
# cv2.waitKey(0)
# cv2.destroyWindow(title)
# 添加固定的卡片坐标
# for: 1280x720
CARD_POSITIONS = [
(47, 883, 192, 252), # 左卡片 (x, y, w, h)
(264, 883, 192, 252), # 中卡片
(481, 883, 192, 252) # 右卡片
]
CARD_PAD = 25
CARD_SCREEN_PAD = 17
def calc_pos(count: int) -> list[tuple[int, int, int, int]]:
# 算法:根据 CARD_POSITIONS三张的情况
# 如果卡片数量过多导致无法保持原间距,则改为重叠布局
# 重叠时保持与屏幕两边间距为CARD_PAD
# 算出 count 张卡片的位置
# 如果只有一张卡片,直接返回中间位置
if count == 1:
middle_card = CARD_POSITIONS[1] # 取中间卡片位置
return [middle_card]
# 计算原始卡片间距
card_spacing = CARD_POSITIONS[1][0] - CARD_POSITIONS[0][0] # 相邻卡片x坐标之差
card_width = CARD_POSITIONS[0][2]
# 计算屏幕可用宽度(减去两边的padding)
screen_width = 720 # 使用最右卡片右边缘作为屏幕宽度
available_width = screen_width - (CARD_SCREEN_PAD * 2)
# 计算使用原始间距时的总宽度
original_total_width = (count - 1) * card_spacing + card_width
# 判断是否需要重叠布局
if original_total_width > available_width:
# 需要重叠布局
# 计算重叠距离 = (总宽度 - 可用宽度) / (卡片数量 - 1)
# overlap = (original_total_width - available_width) // (count - 1)
# spacing = card_width - overlap
spacing = (available_width - card_width * count - CARD_SCREEN_PAD * 2) // (count)
start_x = CARD_SCREEN_PAD
else:
# 使用原始间距,水平居中
spacing = card_spacing
start_x = (screen_width - original_total_width) // 2
# 生成所有卡片位置
positions = []
x = start_x
for i in range(count):
# y,w,h 保持不变,使用第一张卡的参数
y = CARD_POSITIONS[0][1]
w = CARD_POSITIONS[0][2]
h = CARD_POSITIONS[0][3]
positions.append((x, y, w, h))
x += spacing + card_width # 确保x是整数
# 四舍五入
positions = [(round(x), round(y), round(w), round(h)) for x, y, w, h in positions]
return positions
def detect_cards(image: MatLike, card_dimensions: list[tuple[int, int, int, int]]) -> list[CardDetectResult]:
card_contours = []
preview = image.copy()
# 圈出所有卡片预览
pv = image.copy()
# for x, y, w, h in CARD_POSITIONS:
# cv2.rectangle(pv, (x, y), (x+w, y+h), (0, 255, 0), 1)
# # 红色画出外围
# cv2.rectangle(pv, (x-GLOW_EXTENSION, y-GLOW_EXTENSION), (x+w+GLOW_EXTENSION, y+h+GLOW_EXTENSION), (0, 0, 255), 1)
# show("pv", pv)
for x, y, w, h in card_dimensions:
# 获取扩展后的卡片区域坐标
outer_x = max(0, x - GLOW_EXTENSION)
outer_y = max(0, y - GLOW_EXTENSION)
outer_w = w + (GLOW_EXTENSION * 2)
outer_h = h + (GLOW_EXTENSION * 2)
# 获取内外两个区域
outer_region = image[outer_y:y+h+GLOW_EXTENSION, outer_x:x+w+GLOW_EXTENSION]
inner_region = image[y:y+h, x:x+w]
# 创建掩码
outer_hsv = cv2.cvtColor(outer_region, cv2.COLOR_BGR2HSV)
inner_hsv = cv2.cvtColor(inner_region, cv2.COLOR_BGR2HSV)
# 计算外部区域的黄色部分
outer_mask = cv2.inRange(outer_hsv, YELLOW_LOWER, YELLOW_UPPER)
inner_mask = cv2.inRange(inner_hsv, YELLOW_LOWER, YELLOW_UPPER)
# 创建环形区域的掩码(仅计算扩展区域的荧光值)
ring_mask = outer_mask.copy()
ring_mask[GLOW_EXTENSION:GLOW_EXTENSION+h, GLOW_EXTENSION:GLOW_EXTENSION+w] = 0
# 计算环形区域的荧光值
glow_value = cv2.countNonZero(ring_mask)
card_contours.append(CardDetectResult(
x,
y,
w,
h,
glow_value,
glow_value > GLOW_THRESHOLD
))
# 在预览图像上画出内外区域
cv2.rectangle(preview, (outer_x, outer_y), (outer_x+outer_w, outer_y+outer_h), (0, 0, 255), 2)
cv2.rectangle(preview, (x, y), (x+w, y+h), (0, 255, 0), 2)
cv2.putText(preview, f"Glow: {glow_value}", (x, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, cv2.LINE_AA)
if glow_value > GLOW_THRESHOLD: # 假设阈值为200
cv2.putText(preview, "TargetCard", (x, y+20),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
show("cards", preview)
cv2.waitKey(1)
return card_contours
def main():
# 初始化ADB设备
# adb.connect("127.0.0.1:16384")
# device = AdbDevice(adb.device_list()[0])
CARD_POSITIONS_4 = [
(17, 883, 192, 252),
(182, 883, 192, 252),
(346, 883, 192, 252),
(511, 883, 192, 252),
# delta_x = 165, delta_x-width = -27
]
while True:
# 获取屏幕截图
img = cv2.imread('tests/images/produce/recommended_card_4_3_0.png')
# 检测卡片
cards = detect_cards(img, CARD_POSITIONS_4)
# 如果检测到3个或更多卡片
if len(cards) >= 3:
print("检测到3个卡片")
# 在图像上绘制检测结果
for i, (x, y, w, h, glow, is_target) in enumerate(cards[:3]):
cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
cv2.putText(img, f"Card {i+1}", (x, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# 显示结果
cv2.imshow("Detected Cards", img)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
# break
# 等待1秒后继续检测
cv2.waitKey(1000)
from kotonebot.client.device.fast_screenshot import AdbFastScreenshots
with AdbFastScreenshots(
adb_path=r"D:\SDK\Android\platform-tools\adb.exe",
device_serial="127.0.0.1:16384",
time_interval=179,
width=720,
height=1280,
bitrate="5M",
use_busybox=False,
connect_to_device=True,
screenshotbuffer=10,
go_idle=0,
) as adbscreen:
pos_tobe_clicked = None
pos_clicked_count = 0
for image in adbscreen:
if pos_tobe_clicked is not None:
pos_clicked_count += 1
if pos_clicked_count >= 2:
pos_tobe_clicked = None
pos_clicked_count = 0
continue
device.click(*pos_tobe_clicked)
# 获取屏幕截图
img = image
# 检测卡片
cards = detect_cards(img, CARD_POSITIONS)
# 如果检测到3个或更多卡片
if len(cards) >= 3:
# print("检测到3个卡片")
# 在图像上绘制检测结果
for i, card in enumerate(cards[:3]):
cv2.rectangle(img, (card.x, card.y), (card.x+card.w, card.y+card.h), (0, 255, 0), 2)
cv2.putText(img, f"Card {i+1}", (card.x, card.y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# 打印最大荧光值
print(f"最大荧光值: {max(card.glow_value for card in cards)}")
# 显示结果
# cv2.imshow("Detected Cards", img)
cv2.waitKey(1)
# 如果有则点击目标卡
if not pos_tobe_clicked and any(card.is_target for card in cards):
target_card = next(card for card in cards if card.is_target)
pos = (target_card.x + target_card.w // 2, target_card.y + target_card.h // 2)
print(f"点击位置: {pos}")
pos_tobe_clicked = pos
pos_clicked_count = 0
# TODO: 最终考试前练习不榨干体力
if __name__ == "__main__":
main()

View File

@ -1,121 +0,0 @@
import cv2
import numpy as np
from device.adb import AdbDevice
from adbutils import adb
# 初始化ADB设备
adb.connect("127.0.0.1:16384")
device = AdbDevice(adb.device_list()[0])
# 定义检测参数
TARGET_ASPECT_RATIO_RANGE = (0.73, 0.80)
TARGET_COLOR = (240, 240, 240)
YELLOW_LOWER = np.array([20, 100, 100])
YELLOW_UPPER = np.array([30, 255, 255])
import ctypes
user32 = ctypes.windll.user32
user32.SetProcessDPIAware()
screen = user32.GetSystemMetrics(0), user32.GetSystemMetrics(1)
def show(title, image):
img_h, img_w = image.shape[:2]
scale_x = screen[0] * 0.8 / img_w
scale_y = screen[1] * 0.8 / img_h
if scale_x < 1 or scale_y < 1:
scale = min(scale_x, scale_y)
resized = cv2.resize(image, (0, 0), fx=scale, fy=scale)
else:
resized = image
cv2.imshow(title, resized)
# cv2.waitKey(0)
# cv2.destroyWindow(title)
def detect_cards(image):
original = image.copy()
# 保存
cv2.imwrite("original.png", original)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
_, binary = cv2.threshold(gray, 190, 255, cv2.THRESH_BINARY)
edges = cv2.Canny(binary, 150, 300)
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
show("edges", edges)
card_contours = []
for i, contour in enumerate(contours):
area = cv2.contourArea(contour)
if area > 400:
peri = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * peri, True)
x, y, w, h = cv2.boundingRect(approx)
# 展示裁剪后的图像
card_region = image[y:y+h, x:x+w]
show(f"card_region", card_region)
# 检查长宽比
aspect_ratio = w / float(h)
if not (TARGET_ASPECT_RATIO_RANGE[0] < aspect_ratio < TARGET_ASPECT_RATIO_RANGE[1]):
continue
# 检查颜色
card_region = image[y:y+h, x:x+w]
bottom_right = card_region[-40:, -40:]
bottom_left = card_region[-40:, :40]
avg_color_br = np.mean(bottom_right, axis=(0, 1))
avg_color_bl = np.mean(bottom_left, axis=(0, 1))
if not (np.allclose(avg_color_br, TARGET_COLOR, atol=5) and
np.allclose(avg_color_bl, TARGET_COLOR, atol=5)):
continue
# 计算黄色荧光值
hsv = cv2.cvtColor(card_region, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv, YELLOW_LOWER, YELLOW_UPPER)
glow_value = cv2.countNonZero(mask)
card_contours.append((x, y, w, h, glow_value))
# 在原图上画出所有轮廓并展示
# 按顺序画出所有轮廓
preview = image.copy()
for i, (x, y, w, h, glow_value) in enumerate(card_contours):
cv2.rectangle(preview, (x, y), (x+w, y+h), (0, 255, 0), 2)
cv2.putText(preview, f"Card {i+1}", (x, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, cv2.LINE_AA)
show("cards", preview)
return card_contours
def main():
while True:
# 获取屏幕截图
img = device.screenshot()
# 检测卡片
cards = detect_cards(img)
# 如果检测到3个或更多卡片
if len(cards) >= 3:
print("检测到3个卡片")
# 在图像上绘制检测结果
for i, (x, y, w, h, glow) in enumerate(cards[:3]):
cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
cv2.putText(img, f"Card {i+1}", (x, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# 显示结果
cv2.imshow("Detected Cards", img)
cv2.waitKey(0)
cv2.destroyAllWindows()
break
# 等待1秒后继续检测
cv2.waitKey(1000)
if __name__ == "__main__":
main()

View File

@ -1,49 +0,0 @@
import cv2
import numpy as np
# 读取图像
template = cv2.imread('tests/images/pdorinku.png')
mask = cv2.imread('test_mask.png')
image = cv2.imread('tests/images/acquire_pdorinku.png')
# 打印大小
print(template.shape)
print(mask.shape)
print(image.shape)
# 将掩码二值化
# mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)[1]
# 反转
# 转换掩码为单通道灰度图
mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
# mask = cv2.bitwise_not(mask)
cv2.imshow('mask', mask)
# 展示 masked 模板
# 确保掩码和模板大小一致
# mask = cv2.resize(mask, (template.shape[1], template.shape[0]))
masked_template = cv2.bitwise_and(template, template, mask=mask)
cv2.imshow('masked_template', masked_template)
# 模板匹配
result = cv2.matchTemplate(image, template, cv2.TM_CCORR_NORMED, mask=mask)
cv2.imshow('result', result)
# 获取最佳匹配位置
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
# 获取模板尺寸
h, w = template.shape[:2]
# 在原图上绘制矩形标注结果
top_left = max_loc
bottom_right = (top_left[0] + w, top_left[1] + h)
cv2.rectangle(image, top_left, bottom_right, (0, 0, 255), 2)
# 显示结果
# 缩放 1/2
image = cv2.resize(image, (0, 0), fx=0.5, fy=0.5)
cv2.imshow('Result', image)
cv2.waitKey(0)
cv2.destroyAllWindows()

View File

@ -13,15 +13,18 @@ from .backend.context import (
sleep,
task,
action,
use_screenshot
)
from .backend.util import (
Rect,
grayscaled,
grayscale_cached,
cropped,
UnrecoverableError,
AdaptiveWait,
Countdown,
Interval,
until,
crop_rect,
)
from .backend.color import (
hsv_cv2web,

View File

@ -7,6 +7,7 @@ from datetime import datetime
from threading import Event
from typing import (
Callable,
Optional,
cast,
overload,
Any,
@ -24,7 +25,7 @@ import cv2
from cv2.typing import MatLike
from kotonebot.client.protocol import DeviceABC
from kotonebot.backend.util import Rect, KotonebotWarning
from kotonebot.backend.util import Rect
import kotonebot.backend.image as raw_image
from kotonebot.client.device.adb import AdbDevice
from kotonebot.backend.image import (
@ -36,6 +37,7 @@ from kotonebot.backend.image import (
find_multi,
find_all,
find_all_multi,
count
)
import kotonebot.backend.color as raw_color
from kotonebot.backend.color import find_rgb
@ -43,6 +45,7 @@ from kotonebot.backend.ocr import Ocr, OcrResult, OcrResultList, jp, en, StringM
from kotonebot.config.manager import load_config, save_config
from kotonebot.config.base_config import UserConfig
from kotonebot.backend.core import Image, HintBox
from kotonebot.errors import KotonebotWarning
OcrLanguage = Literal['jp', 'en']
ScreenshotMode = Literal['auto', 'manual', 'manual-inherit']
@ -183,6 +186,8 @@ class ContextStackVars:
"""
self._screenshot: MatLike | None = None
"""截图数据"""
self._inherit_screenshot: MatLike | None = None
"""继承的截图数据"""
@property
def screenshot(self) -> MatLike:
@ -204,7 +209,7 @@ class ContextStackVars:
vars.screenshot_mode = screenshot_mode
current = ContextStackVars.current()
if current and vars.screenshot_mode == 'manual-inherit':
vars._screenshot = current._screenshot
vars._inherit_screenshot = current._screenshot
ContextStackVars.stack.append(vars)
return vars
@ -495,6 +500,9 @@ class ContextImage:
def find_all_crop(self, *args, **kwargs):
return find_all_crop(ContextStackVars.ensure_current().screenshot, *args, **kwargs)
@context(count)
def count(self, *args, **kwargs):
return count(ContextStackVars.ensure_current().screenshot, *args, **kwargs)
@interruptible_class
class ContextColor:
@ -610,17 +618,31 @@ class ContextDevice(DeviceABC):
def __init__(self, device: DeviceABC):
self._device = device
@deprecated('使用 device.screenshot() 代替')
def update_screenshot(self):
ContextStackVars.ensure_current()._screenshot = self._device.screenshot()
return self.screenshot()
def screenshot(self):
"""
截图返回截图数据同时更新当前上下文的截图数据
"""
current = ContextStackVars.ensure_current()
if current._inherit_screenshot is not None:
img = current._inherit_screenshot
current._inherit_screenshot = None
else:
img = self._device.screenshot()
current._screenshot = img
return img
def __getattribute__(self, name: str) -> Any:
if name in ['update_screenshot', '_device']:
if name in ['update_screenshot', '_device', 'screenshot']:
return object.__getattribute__(self, name)
else:
return getattr(self._device, name)
def __setattr__(self, name: str, value: Any):
if name in ['update_screenshot', '_device']:
if name in ['update_screenshot', '_device', 'screenshot']:
return object.__setattr__(self, name, value)
else:
return setattr(self._device, name, value)
@ -642,8 +664,34 @@ class Context(Generic[T]):
d = [d for d in adb.device_list() if d.serial == f'{ip}:{port}']
self.__device = ContextDevice(AdbDevice(d[0]))
def inject_device(self, device: DeviceABC):
self.__device = ContextDevice(device)
def inject(
self,
*,
device: Optional[ContextDevice | DeviceABC] = None,
ocr: Optional[ContextOcr] = None,
image: Optional[ContextImage] = None,
color: Optional[ContextColor] = None,
vars: Optional[ContextGlobalVars] = None,
debug: Optional[ContextDebug] = None,
config: Optional[ContextConfig] = None,
):
if device is not None:
if isinstance(device, DeviceABC):
self.__device = ContextDevice(device)
else:
self.__device = device
if ocr is not None:
self.__ocr = ocr
if image is not None:
self.__image = image
if color is not None:
self.__color = color
if vars is not None:
self.__vars = vars
if debug is not None:
self.__debug = debug
if config is not None:
self.__config = config
@property
def device(self) -> ContextDevice:
@ -679,6 +727,13 @@ def rect_expand(rect: Rect, left: int = 0, top: int = 0, right: int = 0, bottom:
"""
return (rect[0] - left, rect[1] - top, rect[2] + right + left, rect[3] + bottom + top)
def use_screenshot(*args: MatLike | None) -> MatLike:
for img in args:
if img is not None:
ContextStackVars.ensure_current()._screenshot = img # HACK
return img
return device.screenshot()
# 这里 Context 类还没有初始化,但是 tasks 中的脚本可能已经引用了这里的变量
# 为了能够动态更新这里变量的值,这里使用 Forwarded 类再封装一层,
# 将调用转发到实际的稍后初始化的 Context 类上
@ -726,6 +781,22 @@ def init_context(
debug._FORWARD_getter = lambda: _c.debug # type: ignore
config._FORWARD_getter = lambda: _c.config # type: ignore
def inject_context(
*,
device: Optional[ContextDevice | DeviceABC] = None,
ocr: Optional[ContextOcr] = None,
image: Optional[ContextImage] = None,
color: Optional[ContextColor] = None,
vars: Optional[ContextGlobalVars] = None,
debug: Optional[ContextDebug] = None,
config: Optional[ContextConfig] = None,
):
global _c
if _c is None:
raise RuntimeError('Context not initialized')
_c.inject(device=device, ocr=ocr, image=image, color=color, vars=vars, debug=debug, config=config)
class ManualContextManager:
def __init__(self, screenshot_mode: ScreenshotMode = 'auto'):
self.screenshot_mode: ScreenshotMode = screenshot_mode
@ -736,6 +807,12 @@ class ManualContextManager:
def __exit__(self, exc_type, exc_value, traceback):
ContextStackVars.pop()
def begin(self):
self.__enter__()
def end(self):
self.__exit__(None, None, None)
def manual_context(screenshot_mode: ScreenshotMode = 'auto') -> ManualContextManager:
"""
默认情况下Context* 类仅允许在 @task/@action 函数中使用

View File

@ -5,10 +5,10 @@ from typing import Callable, ParamSpec, TypeVar, overload, TYPE_CHECKING
import cv2
from cv2.typing import MatLike
from kotonebot.errors import ResourceFileMissingError
if TYPE_CHECKING:
from kotonebot.backend.util import Rect
class Ocr:
def __init__(
self,
@ -31,7 +31,7 @@ class Image:
):
self.path = path
self.name = name
self.__data = data
self.__data: MatLike | None = data
self.__data_with_alpha: MatLike | None = None
@property
@ -40,6 +40,9 @@ class Image:
if self.path is None:
raise ValueError('Either path or data must be provided.')
self.__data = cv2.imread(self.path)
if self.__data is None:
raise ResourceFileMissingError(self.path, 'sprite')
logger.debug(f'Read image "{self.name}" from {self.path}')
return self.__data
@property
@ -48,6 +51,9 @@ class Image:
if self.path is None:
raise ValueError('Either path or data must be provided.')
self.__data_with_alpha = cv2.imread(self.path, cv2.IMREAD_UNCHANGED)
if self.__data_with_alpha is None:
raise ResourceFileMissingError(self.path, 'sprite with alpha')
logger.debug(f'Read image "{self.name}" from {self.path}')
return self.__data_with_alpha
def __repr__(self) -> str:
@ -67,7 +73,9 @@ class HintBox(tuple[int, int, int, int]):
*,
source_resolution: tuple[int, int],
):
return super().__new__(cls, [x1, y1, x2, y2])
w = x2 - x1
h = y2 - y1
return super().__new__(cls, [x1, y1, w, h])
def __init__(
self,

View File

@ -0,0 +1,51 @@
import time
import cv2
from kotonebot.client.protocol import DeviceABC
from kotonebot import sleep
class Video:
def __init__(self, path: str, fps: int):
self.path = path
self.fps = fps
self.paused = False
"""是否暂停"""
self.__cap = cv2.VideoCapture(path)
self.__last_frame = None
self.__last_time = 0
def __iter__(self):
return self
def __next__(self):
if self.paused:
return self.__last_frame
ret, frame = self.__cap.read()
if not ret:
raise StopIteration
self.__last_frame = frame
self.__last_time = time.time()
if self.__last_time - time.time() < 1 / self.fps:
sleep(1 / self.fps)
return frame
def pause(self):
self.paused = True
def resume(self):
self.paused = False
class MockDevice(DeviceABC):
def __init__(
self
):
self.__video_stream = None
def load_video(self, path: str, fps: int):
self.__video_stream = Video(path, fps)
return self.__video_stream
def screenshot(self):
if self.__video_stream:
return next(self.__video_stream)
else:
raise RuntimeError('No video stream loaded')

View File

@ -1,14 +1,18 @@
import time
import uuid
import logging
import inspect
from logging import Logger
from types import CodeType
from dataclasses import dataclass
from typing import Annotated, Any, Callable, Concatenate, TypeVar, ParamSpec, Literal, Protocol, cast
from typing_extensions import Self
from dataclasses import dataclass
from .context import ContextColor
from .core import Image, Ocr
from kotonebot.backend.ocr import StringMatchFunction
from .core import Image
logger = logging.getLogger(__name__)
P = ParamSpec('P')
@ -109,6 +113,122 @@ def dispatcher(
else:
return wrapper
@dataclass
class ClickParams:
finish: bool = False
log: str | None = None
class Click:
def __init__(self, sd: 'SimpleDispatcher', target: Image | str | StringMatchFunction | Literal['center'], *, params: ClickParams = ClickParams()):
self.target = target
self.params = params
self.sd = sd
def __call__(self):
from kotonebot import device
if self.params.log:
self.sd.logger.info(self.params.log)
device.click_center()
if self.params.finish:
self.sd.finished = True
class ClickImage:
def __init__(self, sd: 'SimpleDispatcher', image: Image, *, params: ClickParams = ClickParams()):
self.image = image
self.params = params
self.sd = sd
def __call__(self):
from kotonebot import device, image
if image.find(self.image):
if self.params.log:
self.sd.logger.info(self.params.log)
device.click()
if self.params.finish:
self.sd.finished = True
class ClickImageAny:
def __init__(self, sd: 'SimpleDispatcher', images: list[Image], params: ClickParams = ClickParams()):
self.images = images
self.params = params
self.sd = sd
def __call__(self):
from kotonebot import device, image
if image.find_multi(self.images):
if self.params.log:
self.sd.logger.info(self.params.log)
device.click()
if self.params.finish:
self.sd.finished = True
class ClickText:
def __init__(
self,
sd: 'SimpleDispatcher',
text: str | StringMatchFunction,
params: ClickParams = ClickParams()
):
self.text = text
self.params = params
self.sd = sd
def __call__(self):
from kotonebot import device, ocr
if ocr.find(self.text):
if self.params.log:
self.sd.logger.info(self.params.log)
device.click()
if self.params.finish:
self.sd.finished = True
class SimpleDispatcher:
def __init__(self, name: str, *, interval: float = 0.2):
self.name = name
self.logger = logging.getLogger(f'SimpleDispatcher of {name}')
self.blocks: list[Callable] = []
self.finished: bool = False
self.interval = interval
self.__last_run_time: float = 0
def click(
self,
target: Image | str | StringMatchFunction | Literal['center'],
*,
finish: bool = False,
log: str | None = None
):
params = ClickParams(finish=finish, log=log)
if isinstance(target, Image):
self.blocks.append(ClickImage(self, target, params=params))
else:
self.blocks.append(ClickText(self, target, params=params))
return self
def click_any(
self,
target: list[Image],
*,
finish: bool = False,
log: str | None = None
):
params = ClickParams(finish=finish, log=log)
self.blocks.append(ClickImageAny(self, target, params))
return self
def run(self):
from kotonebot import device, sleep
while True:
time_delta = time.time() - self.__last_run_time
if time_delta < self.interval:
sleep(self.interval - time_delta)
for block in self.blocks:
block()
if self.finished:
break
self.__last_run_time = time.time()
device.screenshot()
if __name__ == '__main__':
from .context.task_action import action
from .context import init_context

View File

@ -342,10 +342,10 @@ def find_all_crop(
remove_duplicate=remove_duplicate,
colored=colored,
)
logger.debug(
f'find_all_crop(): template: {_img2str(template)} image: {_img2str(image)} mask: {_img2str(mask)} '
f'matches: {_results2str(matches)}'
)
# logger.debug(
# f'find_all_crop(): template: {_img2str(template)} image: {_img2str(image)} mask: {_img2str(mask)} '
# f'matches: {_results2str(matches)}'
# )
return [CropResult(
match.score,
match.position,
@ -386,10 +386,10 @@ def find(
remove_duplicate=remove_duplicate,
colored=colored,
)
logger.debug(
f'find(): template: {_img2str(template)} image: {_img2str(image)} mask: {_img2str(mask)} '
f'matches: {_results2str(matches)}'
)
# logger.debug(
# f'find(): template: {_img2str(template)} image: {_img2str(image)} mask: {_img2str(mask)} '
# f'matches: {_results2str(matches)}'
# )
# 调试输出
if debug.enabled and debug_output:
result_image = _draw_result(image, matches)
@ -436,10 +436,10 @@ def find_all(
remove_duplicate=remove_duplicate,
colored=colored,
)
logger.debug(
f'find_all(): template: {_img2str(template)} image: {_img2str(image)} mask: {_img2str(mask)} '
f'matches: {_results2str(results)}'
)
# logger.debug(
# f'find_all(): template: {_img2str(template)} image: {_img2str(image)} mask: {_img2str(mask)} '
# f'matches: {_results2str(results)}'
# )
if debug.enabled and debug_output:
result_image = _draw_result(image, results)
debug_result(
@ -452,8 +452,8 @@ def find_all(
def find_multi(
image: MatLike,
templates: list[MatLike | str | Image],
masks: list[MatLike | str | Image | None] | None = None,
templates: Sequence[MatLike | str | Image],
masks: Sequence[MatLike | str | Image | None] | None = None,
*,
transparent: bool = False,
threshold: float = 0.8,
@ -496,10 +496,10 @@ def find_multi(
index=index
)
break
logger.debug(
f'find_multi(): templates: {_imgs2str(templates)} images: {_img2str(image)} masks: {_imgs2str(masks)} '
f'result: {_result2str(ret)}'
)
# logger.debug(
# f'find_multi(): templates: {_imgs2str(templates)} images: {_img2str(image)} masks: {_imgs2str(masks)} '
# f'result: {_result2str(ret)}'
# )
if debug.enabled:
msg = (
"<table class='result-table'>" +
@ -567,10 +567,10 @@ def find_all_multi(
MultipleTemplateMatchResult.from_template_match_result(r, index)
for r in results
])
logger.debug(
f'find_all_multi(): templates: {_imgs2str(templates)} images: {_img2str(image)} masks: {_imgs2str(masks)} '
f'result: {_results2str(ret)}'
)
# logger.debug(
# f'find_all_multi(): templates: {_imgs2str(templates)} images: {_img2str(image)} masks: {_imgs2str(masks)} '
# f'result: {_results2str(ret)}'
# )
if debug.enabled:
# 参数表格
msg = (
@ -631,10 +631,10 @@ def count(
remove_duplicate=remove_duplicate,
colored=colored,
)
logger.debug(
f'count(): template: {_img2str(template)} image: {_img2str(image)} mask: {_img2str(mask)} '
f'result: {_results2str(results)}'
)
# logger.debug(
# f'count(): template: {_img2str(template)} image: {_img2str(image)} mask: {_img2str(mask)} '
# f'result: {_results2str(results)}'
# )
if debug.enabled:
result_image = _draw_result(image, results)
debug_result(
@ -681,10 +681,10 @@ def expect(
remove_duplicate=remove_duplicate,
debug_output=False,
)
logger.debug(
f'expect(): template: {_img2str(template)} image: {_img2str(image)} mask: {_img2str(mask)} '
f'result: {_result2str(ret)}'
)
# logger.debug(
# f'expect(): template: {_img2str(template)} image: {_img2str(image)} mask: {_img2str(mask)} '
# f'result: {_result2str(ret)}'
# )
if debug.enabled:
debug_result(
'image.expect',
@ -716,10 +716,10 @@ def similar(
image1 = cv2.cvtColor(image1, cv2.COLOR_BGR2GRAY)
image2 = cv2.cvtColor(image2, cv2.COLOR_BGR2GRAY)
result = structural_similarity(image1, image2, multichannel=True)
logger.debug(
f'similar(): image1: {_img2str(image1)} image2: {_img2str(image2)} '
f'result: {result}'
)
# logger.debug(
# f'similar(): image1: {_img2str(image1)} image2: {_img2str(image2)} '
# f'result: {result}'
# )
# 调试输出
if debug.enabled:
result_image = np.hstack([image1, image2])

View File

@ -2,6 +2,8 @@ import re
import logging
import unicodedata
from functools import lru_cache
from dataclasses import dataclass
from typing_extensions import Self
from typing import Callable, NamedTuple
import cv2
@ -32,7 +34,8 @@ _engine_en = RapidOCR(
StringMatchFunction = Callable[[str], bool]
REGEX_NUMBERS = re.compile(r'\d+')
class OcrResult(NamedTuple):
@dataclass
class OcrResult:
text: str
rect: Rect
confidence: float
@ -40,6 +43,13 @@ class OcrResult(NamedTuple):
def __repr__(self) -> str:
return f'OcrResult(text="{self.text}", rect={self.rect}, confidence={self.confidence})'
def replace(self, old: str, new: str, count: int = -1) -> Self:
"""
替换识别结果中的文本
"""
self.text = self.text.replace(old, new, count)
return self
def regex(self, pattern: re.Pattern | str) -> list[str]:
"""
提取识别结果中符合正则表达式的文本
@ -55,10 +65,12 @@ class OcrResult(NamedTuple):
return [int(x) for x in REGEX_NUMBERS.findall(self.text)]
class OcrResultList(list[OcrResult]):
def squash(self) -> OcrResult:
def squash(self, remove_newlines: bool = True) -> OcrResult:
"""
将所有识别结果合并为一个大结果
"""
if not self:
return OcrResult('', (0, 0, 0, 0), 0)
text = [r.text for r in self]
confidence = sum(r.confidence for r in self) / len(self)
points = []
@ -68,13 +80,15 @@ class OcrResultList(list[OcrResult]):
points.append((r.rect[0], r.rect[1] + r.rect[3]))
points.append((r.rect[0] + r.rect[2], r.rect[1] + r.rect[3]))
rect = bounding_box(points)
text = '\n'.join(text)
if remove_newlines:
text = text.replace('\n', '')
return OcrResult(
text='\n'.join(text),
text=text,
rect=rect,
confidence=confidence,
)
def first(self) -> OcrResult | None:
"""
返回第一个识别结果
@ -202,16 +216,26 @@ def pad_to(img: MatLike, target_size: tuple[int, int], rgb: tuple[int, int, int]
img = cv2.resize(img, (new_w, new_h))
# 创建目标画布并填充
ret = np.full((th, tw, 3), rgb, dtype=np.uint8)
if len(img.shape) == 2:
# 灰度图像
ret = np.full((th, tw), rgb[0], dtype=np.uint8)
else:
# RGB图像
ret = np.full((th, tw, 3), rgb, dtype=np.uint8)
# 计算需要填充的宽高
pad_h = th - new_h
pad_w = tw - new_w
# 将缩放后的图像居中放置
ret[
pad_h // 2:pad_h // 2 + new_h,
pad_w // 2:pad_w // 2 + new_w, :] = img
if len(img.shape) == 2:
ret[
pad_h // 2:pad_h // 2 + new_h,
pad_w // 2:pad_w // 2 + new_w] = img
else:
ret[
pad_h // 2:pad_h // 2 + new_h,
pad_w // 2:pad_w // 2 + new_w, :] = img
return ret
def _draw_result(image: 'MatLike', result: list[OcrResult]) -> 'MatLike':
@ -308,7 +332,7 @@ class Ocr:
# https://blog.csdn.net/YY007H/article/details/124973777
original_img = img.copy()
img = pad_to(img, (631, 631))
img_content = grayscaled(img)
img_content = img
result, elapse = self.__engine(img_content)
if result is None:
return OcrResultList()
@ -323,9 +347,13 @@ class Ocr:
ret = OcrResultList(ret)
if debug.enabled:
result_image = _draw_result(img, ret)
elapse = elapse or [0, 0, 0]
debug_result(
'ocr',
[result_image, original_img],
f"pad={pad}\n" + \
f"rect={rect}\n" + \
f"elapsed: det={elapse[0]:.3f}s cls={elapse[1]:.3f}s rec={elapse[2]:.3f}s\n" + \
f"result: \n" + \
"<table class='result-table'><tr><th>Text</th><th>Confidence</th></tr>" + \
"\n".join([f"<tr><td>{r.text}</td><td>{r.confidence:.2f}</td></tr>" for r in ret]) + \

View File

@ -4,7 +4,6 @@ import pstats
import typing
import logging
import cProfile
from time import sleep
from importlib import resources
from functools import lru_cache
from typing import Literal, Callable, TYPE_CHECKING
@ -20,8 +19,6 @@ from .core import Image
logger = logging.getLogger(__name__)
class UnrecoverableError(Exception):
pass
Rect = typing.Sequence[int]
"""左上X, 左上Y, 宽度, 高度"""
@ -29,8 +26,6 @@ Rect = typing.Sequence[int]
def is_rect(rect: typing.Any) -> bool:
return isinstance(rect, typing.Sequence) and len(rect) == 4 and all(isinstance(i, int) for i in rect)
def crop(img: MatLike, /, x1: float = 0, y1: float = 0, x2: float = 1, y2: float = 1) -> MatLike:
"""
按比例裁剪图像
@ -48,6 +43,16 @@ def crop(img: MatLike, /, x1: float = 0, y1: float = 0, x2: float = 1, y2: float
y2_px = int(h * y2)
return img[y1_px:y2_px, x1_px:x2_px]
def crop_rect(img: MatLike, rect: Rect) -> MatLike:
"""
按范围裁剪图像
:param img: 图像
:param rect: 裁剪区域
"""
x, y, w, h = rect
return img[y:y+h, x:x+w]
class DeviceHookContextManager:
def __init__(
self,
@ -175,6 +180,7 @@ class AdaptiveWait:
self.reset()
def __call__(self):
from .context import sleep
if self.start_time is None:
self.start_time = time.time()
sleep(self.interval)
@ -186,6 +192,37 @@ class AdaptiveWait:
self.interval = self.base_interval
self.start_time = None
class Countdown:
def __init__(self, seconds: float):
self.seconds = seconds
self.start_time = time.time()
def __str__(self):
return f"{self.seconds - (time.time() - self.start_time):.0f}s"
def start(self):
self.start_time = time.time()
def expired(self) -> bool:
return time.time() - self.start_time > self.seconds
def reset(self):
self.start_time = time.time()
class Interval:
def __init__(self, seconds: float):
self.seconds = seconds
self.start_time = time.time()
self.last_wait_time = 0
def wait(self):
from .context import sleep
delta = time.time() - self.start_time
if delta < self.seconds:
sleep(self.seconds - delta)
self.last_wait_time = time.time() - self.start_time
self.start_time = time.time()
package_mode: Literal['wheel', 'standalone'] | None = None
def res_path(path: str) -> str:
"""
@ -207,7 +244,6 @@ def res_path(path: str) -> str:
# 但是 path 已经有了 res所以这里需要去掉 res
real_path = resources.files('kotonebot.res') / '..' / path
ret = str(real_path)
logger.debug(f'res_path: {ret}')
return ret
class Profiler:
@ -247,5 +283,15 @@ class Profiler:
def end(self):
self.__exit__(None, None, None)
class KotonebotWarning(Warning):
pass
def snakeviz(self) -> bool:
if self.stats is None:
logger.warning("Profiler still running. Exit/End Profiler before run snakeviz.")
return False
try:
from snakeviz import cli
cli.main([os.path.abspath(self.file_path)])
return True
except ImportError:
logger.warning("snakeviz is not installed")
return False

View File

@ -147,21 +147,21 @@ class DeviceABC(ABC):
self.click(x, y)
@overload
def double_click(self, x: int, y: int, interval: float = 0.5) -> None:
def double_click(self, x: int, y: int, interval: float = 0.25) -> None:
"""
双击屏幕上的某个点
"""
...
@overload
def double_click(self, rect: Rect, interval: float = 0.5) -> None:
def double_click(self, rect: Rect, interval: float = 0.25) -> None:
"""
双击屏幕上的某个矩形区域
"""
...
@overload
def double_click(self, clickable: ClickableObjectProtocol, interval: float = 0.5) -> None:
def double_click(self, clickable: ClickableObjectProtocol, interval: float = 0.25) -> None:
"""
双击屏幕上的某个可点击对象
"""
@ -171,14 +171,14 @@ class DeviceABC(ABC):
arg0 = args[0]
if is_rect(arg0) or isinstance(arg0, ClickableObjectProtocol):
rect = arg0
interval = kwargs.get('interval', 0.5)
interval = kwargs.get('interval', 0.25)
self.click(rect)
sleep(interval)
self.click(rect)
else:
x = args[0]
y = args[1]
interval = kwargs.get('interval', 0.5)
interval = kwargs.get('interval', 0.25)
self.click(x, y)
sleep(interval)
self.click(x, y)

11
kotonebot/errors.py Normal file
View File

@ -0,0 +1,11 @@
class UnrecoverableError(Exception):
pass
class ResourceFileMissingError(Exception):
def __init__(self, file_path: str, description: str):
self.file_path = file_path
self.description = description
super().__init__(f'Resource file ({description}) "{file_path}" is missing.')
class KotonebotWarning(Warning):
pass

View File

@ -6,24 +6,24 @@ from kotonebot import (
device,
contains,
image,
grayscaled,
grayscale_cached,
action,
sleep
sleep,
Interval,
)
from kotonebot.backend.dispatch import SimpleDispatcher
from kotonebot.tasks.actions.commu import check_and_skip_commu
from .loading import loading, wait_loading_end
from .. import R
from .pdorinku import acquire_pdorinku
logger = getLogger(__name__)
@action('领取技能卡')
@action('领取技能卡', screenshot_mode='manual-inherit')
def acquire_skill_card():
"""获取技能卡(スキルカード)"""
# TODO: 识别卡片内容,而不是固定选卡
# TODO: 不硬编码坐标
logger.debug("Locating all skill cards...")
device.screenshot()
cards = image.find_all_multi([
R.InPurodyuusu.A,
R.InPurodyuusu.M
@ -32,16 +32,42 @@ def acquire_skill_card():
logger.info(f"Found {len(cards)} skill cards")
logger.debug("Click first skill card")
device.click(cards[0].rect)
sleep(1)
# # 确定
# logger.debug("Click 受け取る")
# device.click(ocr.expect(contains("受け取る")).rect)
# # 跳过动画
# device.click(image.expect_wait_any([
# R.InPurodyuusu.PSkillCardIconBlue,
# R.InPurodyuusu.PSkillCardIconColorful
# ], timeout=60))
device.screenshot()
(SimpleDispatcher('acquire_skill_card')
.click(contains("受け取る"), finish=True, log="Skill card #1 acquired")
# .click_any([
# R.InPurodyuusu.PSkillCardIconBlue,
# R.InPurodyuusu.PSkillCardIconColorful
# ], finish=True, log="Skill card #1 acquired")
).run()
# logger.info("Skill card #1 acquired")
@action('选择P物品', screenshot_mode='auto')
def select_p_item():
"""
前置条件P物品选择对话框受け取るPアイテムを選んでください;\n
结束条件P物品获取动画
"""
# 前置条件 [screenshots/produce/in_produce/select_p_item.png]
# 前置条件 [screenshots/produce/in_produce/claim_p_item.png]
POSTIONS = [
(157, 820, 128, 128), # x, y, w, h
(296, 820, 128, 128),
(435, 820, 128, 128),
] # TODO: HARD CODED
device.click(POSTIONS[0])
sleep(0.5)
# 确定
logger.debug("Click 受け取る")
device.click(ocr.expect(contains("受け取る")).rect)
# 跳过动画
device.click(image.expect_wait_any([
R.InPurodyuusu.PSkillCardIconBlue,
R.InPurodyuusu.PSkillCardIconColorful
], timeout=60))
logger.info("Skill card #1 acquired")
device.click(ocr.expect_wait('受け取る'))
AcquisitionType = Literal[
"PDrinkAcquire", # P饮料被动领取
@ -51,79 +77,54 @@ AcquisitionType = Literal[
"PSkillCardChange", # 技能卡更换
"PSkillCardSelect", # 技能卡选择
"PSkillCardEnhance", # 技能卡强化
"PItem", # P物品
"PItemClaim", # P物品领取
"PItemSelect", # P物品选择
"Clear", # 目标达成
"NetworkError", # 网络中断弹窗
"SkipCommu", # 跳过交流
]
@action('检测并领取奖励')
# TODO: 这个函数可能要换个更好的名字
@action('处理培育事件', screenshot_mode='manual')
def acquisitions() -> AcquisitionType | None:
"""处理行动开始前和结束后可能需要处理的事件,直到到行动页面为止"""
img = device.screenshot_raw()
img = device.screenshot()
screen_size = device.screen_size
gray_img = grayscaled(img)
ocr_results = ocr.raw().ocr(img)
ocr_text = ''.join(r.text for r in ocr_results)
bottom_pos = (int(screen_size[0] * 0.5), int(screen_size[1] * 0.7)) # 底部中间
logger.info("Acquisition stuffs...")
# P饮料被动领取
if image.raw().find(img, R.InPurodyuusu.PDrinkIcon):
# P饮料领取
logger.debug("Check PDrink acquire...")
if image.find(R.InPurodyuusu.PDrinkIcon):
logger.info("PDrink acquire found")
device.click_center()
sleep(1)
return "PDrinkAcquire"
# P饮料主动领取
# if ocr.raw().find(img, contains("受け取るPドリンクを選れでください")):
if image.raw().find(img, R.InPurodyuusu.TextPleaseSelectPDrink):
logger.info("PDrink select found")
acquire_pdorinku(index=0)
return "PDrinkSelect"
# P饮料到达上限
if image.raw().find(img, R.InPurodyuusu.TextPDrinkMax):
logger.debug("Check PDrink max...")
if image.find(R.InPurodyuusu.TextPDrinkMax):
logger.info("PDrink max found")
device.click(image.expect(R.InPurodyuusu.ButtonLeave))
sleep(0.7)
# 可能需要点击确认
device.click(image.expect(R.Common.ButtonConfirm, threshold=0.8))
while True:
if image.find(R.InPurodyuusu.ButtonLeave, colored=True):
device.click()
elif image.find(R.Common.ButtonConfirm):
device.click()
break
device.screenshot()
return "PDrinkMax"
# 技能卡被动领取(支援卡效果)
logger.info("Check skill card acquisition...")
if image.raw().find_multi(img, [
# 技能卡领取
logger.debug("Check skill card acquisition...")
if image.find_multi([
R.InPurodyuusu.PSkillCardIconBlue,
R.InPurodyuusu.PSkillCardIconColorful
]):
logger.info("Acquire skill card found")
device.click_center()
return "PSkillCardAcquire"
# 技能卡更换(支援卡效果)
# [screenshots/produce/in_produce/support_card_change.png]
if 'チェンジ' in ocr_text:
logger.info("Change skill card found")
device.click(*bottom_pos)
return "PSkillCardChange"
# 技能卡强化
# [screenshots/produce/in_produce/skill_card_enhance.png]
if '強化' in ocr_text:
logger.info("Enhance skill card found")
device.click(*bottom_pos)
return "PSkillCardEnhance"
# 技能卡选择
if '受け取るスキルカードを選んでください' in ocr_text:
logger.info("Acquire skill card found")
acquire_skill_card()
sleep(5)
return "PSkillCardSelect"
# 奖励箱技能卡
if res := image.raw().find(gray_img, grayscaled(R.InPurodyuusu.LootBoxSkillCard)):
logger.info("Acquire skill card from loot box")
device.click(res.rect)
# 下面就是普通的技能卡选择
return acquisitions()
# 目标达成
if image.raw().find(gray_img, grayscale_cached(R.InPurodyuusu.IconClearBlue)):
logger.debug("Check gloal clear...")
if image.find(R.InPurodyuusu.IconClearBlue):
logger.info("Clear found")
logger.debug("達成: clicked")
device.click_center()
@ -132,33 +133,95 @@ def acquisitions() -> AcquisitionType | None:
logger.debug("達成 NEXT: clicked")
device.click_center()
return "Clear"
# P物品
if image.raw().find(img, R.InPurodyuusu.PItemIconColorful):
# P物品领取
logger.debug("Check PItem claim...")
if image.find(R.InPurodyuusu.PItemIconColorful):
logger.info("Click to finish PItem acquisition")
device.click_center()
sleep(1)
return "PItem"
return "PItemClaim"
# 网络中断弹窗
if image.raw().find(img, R.Common.TextNetworkError):
logger.debug("Check network error popup...")
if image.find(R.Common.TextNetworkError):
logger.info("Network error popup found")
device.click(image.expect(R.Common.ButtonRetry))
return "NetworkError"
# 加载画面
if loading():
logger.info("Loading screen found")
wait_loading_end()
# 支援卡
# logger.info("Check support card acquisition...")
# 记忆
# 跳过未读交流
logger.debug("Check skip commu...")
if check_and_skip_commu(img):
return "SkipCommu"
# TODO: 在这里加入定时点击以避免再某个地方卡住
# === 需要 OCR 的放在最后执行 ===
# 物品选择对话框
logger.debug("Check award select dialog...")
if result := ocr.find(contains("受け取る"), rect=R.InPurodyuusu.BoxSelectPStuff):
logger.info("Award select dialog found.")
logger.debug(f"Dialog text: {result.text}")
# P饮料选择
logger.debug("Check PDrink select...")
if "Pドリンク" in result.text:
logger.info("PDrink select found")
acquire_pdorinku(index=0)
return "PDrinkSelect"
# 技能卡选择
logger.debug("Check skill card select...")
if "スキルカード" in result.text:
logger.info("Acquire skill card found")
acquire_skill_card()
return "PSkillCardSelect"
# P物品选择
logger.debug("Check PItem select...")
if "Pアイテム" in result.text:
logger.info("Acquire PItem found")
select_p_item()
return "PItemSelect"
# 技能卡变更事件
logger.debug("Check skill card events...")
if result := ocr.ocr(rect=R.InPurodyuusu.BoxSkillCardEnhaced).squash():
# 技能卡更换(支援卡效果)
# [screenshots/produce/in_produce/support_card_change.png]
if "チェンジ" in result.text:
logger.info("Change skill card found")
device.click(*bottom_pos)
return "PSkillCardChange"
# 技能卡强化
# [screenshots/produce/in_produce/skill_card_enhance.png]
if "強化" in result.text:
logger.info("Enhance skill card found")
device.click(*bottom_pos)
return "PSkillCardEnhance"
# 技能卡获取
# [res/sprites/jp/in_purodyuusu/screenshot_skill_card_acquired.png]
if ocr.find("スキルカード獲得", rect=R.InPurodyuusu.BoxSkillCardAcquired):
logger.info("Acquire skill card from loot box")
device.click_center()
# 下面就是普通的技能卡选择
sleep(0.2)
return acquisitions()
return None
def until_acquisition_clear():
"""
处理各种奖励弹窗直到没有新的奖励弹窗为止
前置条件任意\n
结束条件任意
"""
interval = Interval(0.6)
while acquisitions():
interval.wait()
if __name__ == '__main__':
from logging import getLogger
import logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s')
getLogger('kotonebot').setLevel(logging.DEBUG)
getLogger(__name__).setLevel(logging.DEBUG)
select_p_item()

View File

@ -4,10 +4,11 @@ import logging
from cv2.typing import MatLike
from .. import R
from kotonebot import device, image, color, user, rect_expand, until, action, sleep
from kotonebot import device, image, color, user, rect_expand, until, action, sleep, use_screenshot
logger = logging.getLogger(__name__)
@action('检查是否处于交流')
def is_at_commu():
return image.find(R.Common.ButtonCommuFastforward) is not None
@ -16,7 +17,7 @@ def is_at_commu():
def skip_commu():
device.click(image.expect_wait(R.Common.ButtonCommuSkip))
@action('检查并跳过交流')
@action('检查并跳过交流', screenshot_mode='manual')
def check_and_skip_commu(img: MatLike | None = None) -> bool:
"""
检查当前是否处在未读交流并自动跳过
@ -26,8 +27,7 @@ def check_and_skip_commu(img: MatLike | None = None) -> bool:
"""
ret = False
logger.info('Check and skip commu')
if img is None:
img = device.screenshot()
img = use_screenshot(img)
skip_btn = image.find(R.Common.ButtonCommuFastforward)
if skip_btn is None:
logger.info('No fast forward button found. Not at a commu.')
@ -49,7 +49,7 @@ def check_and_skip_commu(img: MatLike | None = None) -> bool:
else:
logger.info('Fast forwarding. No action needed.')
logger.debug('Wait until not at commu')
until(lambda: not is_at_commu(), interval=1)
until(lambda: not is_at_commu(), interval=0.3)
logger.info('Fast forward done')
return ret

View File

@ -1,25 +1,31 @@
import time
import random
import logging
import unicodedata
from typing import Literal
from typing_extensions import deprecated
from typing import Generic, Iterable, Literal, NamedTuple, Callable, Generator, TypeVar, ParamSpec, cast
import cv2
import numpy as np
from cv2.typing import MatLike
from kotonebot.backend.context.context import use_screenshot
from .. import R
from . import loading
from .scenes import at_home
from .common import acquisitions
from ..common import conf
from kotonebot.backend.dispatch import DispatcherContext
from kotonebot.backend.util import AdaptiveWait, UnrecoverableError, crop, cropped
from kotonebot import ocr, device, contains, image, regex, action, debug, config, sleep
from .scenes import at_home
from .common import until_acquisition_clear
from kotonebot.errors import UnrecoverableError
from kotonebot.backend.util import AdaptiveWait, Countdown, crop, cropped
from kotonebot.backend.dispatch import DispatcherContext, SimpleDispatcher
from kotonebot import ocr, device, contains, image, regex, action, sleep, color, Rect
from .non_lesson_actions import (
enter_allowance, allowance_available, study_available, enter_study,
is_rest_available, rest
)
class SkillCard(NamedTuple):
available: bool
rect: Rect
logger = logging.getLogger(__name__)
ActionType = None | Literal['lesson', 'rest']
@ -126,39 +132,50 @@ def handle_recommended_action(final_week: bool = False) -> ActionType:
device.double_click(image.expect_wait(template))
return 'lesson'
def before_start_action():
"""检测支援卡剧情、领取资源等"""
raise NotImplementedError()
@action('打出推荐卡')
# TODO: 这里面的结果也加入 debug 显示
def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
"""点击推荐卡片
:param timeout: 超时时间()
:param card_count: 卡片数量(2-4)
:return: 执行结果-1=失败0~3=卡片位置10=跳过此回合
class CardDetectResult(NamedTuple):
type: int
"""
import cv2
import numpy as np
from cv2.typing import MatLike
点击的卡片类型
# 定义检测参数
TARGET_ASPECT_RATIO_RANGE = (0.73, 0.80)
TARGET_COLOR = (240, 240, 240)
0=第一张卡片1=第二张卡片2=第三张卡片3=第四张卡片10=SKIP
"""
score: float
"""总分数"""
left_score: float
"""左边分数"""
right_score: float
"""右边分数"""
top_score: float
"""上边分数"""
bottom_score: float
"""下边分数"""
rect: Rect
def detect_recommended_card(
card_count: int,
threshold_predicate: Callable[[CardDetectResult], bool],
*,
img: MatLike | None = None,
):
"""
识别推荐卡片
前置条件练习或考试中\n
结束状态-
:param card_count: 卡片数量(2-4)
:param threshold_predicate: 阈值判断函数
:return: 执行结果若返回 None表示未识别到推荐卡片
"""
YELLOW_LOWER = np.array([20, 100, 100])
YELLOW_UPPER = np.array([30, 255, 255])
GLOW_EXTENSION = 10 # 向外扩展的像素数
GLOW_THRESHOLD = 1200 # 荧光值阈值
# 固定的卡片坐标 (for 720x1280)
CARD_POSITIONS_1 = [
# 格式:(x, y, w, h, return_value)
(264, 883, 192, 252, 0)
]
CARD_POSITIONS_2 = [
(156, 883, 192, 252, 1),
(372, 883, 192, 252, 2),
(156, 883, 192, 252, 0),
(372, 883, 192, 252, 1),
# delta_x = 216, delta_x-width = 24
]
CARD_POSITIONS_3 = [
@ -175,189 +192,174 @@ def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
# delta_x = 165, delta_x-width = -27
]
SKIP_POSITION = (621, 739, 85, 85, 10)
GLOW_EXTENSION = 15
@deprecated('此方法待改进')
def calc_pos(card_count: int):
# 根据卡片数量计算实际位置
CARD_PAD = 25
CARD_SCREEN_PAD = 17
card_positions = []
if card_count == 1:
cards = CARD_POSITIONS_1
elif card_count == 2:
cards = CARD_POSITIONS_2
elif card_count == 3:
cards = CARD_POSITIONS_3
elif card_count == 4:
cards = CARD_POSITIONS_4
else:
raise ValueError(f"Unsupported card count: {card_count}")
cards.append(SKIP_POSITION)
image = use_screenshot(img)
results: list[CardDetectResult] = []
for x, y, w, h, return_value in cards:
outer = (max(0, x - GLOW_EXTENSION), max(0, y - GLOW_EXTENSION))
# 裁剪出检测区域
glow_area = image[outer[1]:y + h + GLOW_EXTENSION, outer[0]:x + w + GLOW_EXTENSION]
area_h = glow_area.shape[0]
area_w = glow_area.shape[1]
glow_area[GLOW_EXTENSION:area_h-GLOW_EXTENSION, GLOW_EXTENSION:area_w-GLOW_EXTENSION] = 0
# 过滤出目标黄色
glow_area = cv2.cvtColor(glow_area, cv2.COLOR_BGR2HSV)
yellow_mask = cv2.inRange(glow_area, YELLOW_LOWER, YELLOW_UPPER)
# 计算卡片位置
if card_count == 1:
card_positions = [CARD_POSITIONS_3[1]] # 只使用中间位置
else:
# 计算原始卡片间距
card_spacing = CARD_POSITIONS_3[1][0] - CARD_POSITIONS_3[0][0]
card_width = CARD_POSITIONS_3[0][2]
# 计算屏幕可用宽度
screen_width = 720
available_width = screen_width - (CARD_SCREEN_PAD * 2)
# 计算使用原始间距时的总宽度
original_total_width = (card_count - 1) * card_spacing + card_width
# 判断是否需要重叠布局
if original_total_width > available_width:
spacing = (available_width - card_width * card_count - CARD_SCREEN_PAD * 2) // (card_count)
start_x = CARD_SCREEN_PAD
else:
spacing = card_spacing
start_x = (screen_width - original_total_width) // 2
# 生成所有卡片位置
x = start_x
for i in range(card_count):
y = CARD_POSITIONS_3[0][1]
w = CARD_POSITIONS_3[0][2]
h = CARD_POSITIONS_3[0][3]
card_positions.append((round(x), round(y), round(w), round(h)))
x += spacing + card_width
return card_positions
# 分割出每一边
left_border = yellow_mask[:, 0:GLOW_EXTENSION]
right_border = yellow_mask[:, area_w-GLOW_EXTENSION:area_w]
top_border = yellow_mask[0:GLOW_EXTENSION, :]
bottom_border = yellow_mask[area_h-GLOW_EXTENSION:area_h, :]
y_border_pixels = area_h * GLOW_EXTENSION
x_border_pixels = area_w * GLOW_EXTENSION
def calc_pos2(card_count: int):
if card_count == 1:
return CARD_POSITIONS_1
elif card_count == 2:
return CARD_POSITIONS_2
elif card_count == 3:
return CARD_POSITIONS_3
elif card_count == 4:
return CARD_POSITIONS_4
else:
raise ValueError(f"Unsupported card count: {card_count}")
# 计算每一边的分数
left_score = np.count_nonzero(left_border) / y_border_pixels
right_score = np.count_nonzero(right_border) / y_border_pixels
top_score = np.count_nonzero(top_border) / x_border_pixels
bottom_score = np.count_nonzero(bottom_border) / x_border_pixels
if card_count == 4:
# 随机选择一张卡片点击
# TODO: 支持对四张卡片进行检测
logger.warning("4 cards detected, detecting glowing card in 4 cards is not supported yet.")
logger.info("Click random card")
card_index = random.randint(0, 3)
device.click(CARD_POSITIONS_4[card_index][:4])
sleep(1)
device.click(CARD_POSITIONS_4[card_index][:4])
return card_index
result = (left_score + right_score + top_score + bottom_score) / 4
results.append(CardDetectResult(
return_value,
result,
left_score,
right_score,
top_score,
bottom_score,
(x, y, w, h)
))
start_time = time.time()
while time.time() - start_time < timeout:
img = device.screenshot()
filtered_results = list(filter(threshold_predicate, results))
if not filtered_results:
max_result = max(results, key=lambda x: x.score)
logger.info("Max card detect result (discarded): value=%d score=%.4f borders=(%.4f, %.4f, %.4f, %.4f)",
max_result.type,
max_result.score,
max_result.left_score,
max_result.right_score,
max_result.top_score,
max_result.bottom_score
)
return None
filtered_results.sort(key=lambda x: x.score, reverse=True)
logger.info("Max card detect result: value=%d score=%.4f borders=(%.4f, %.4f, %.4f, %.4f)",
filtered_results[0].type,
filtered_results[0].score,
filtered_results[0].left_score,
filtered_results[0].right_score,
filtered_results[0].top_score,
filtered_results[0].bottom_score
)
return filtered_results[0]
# 检测卡片
card_glows = []
for x, y, w, h, return_value in calc_pos2(card_count) + [SKIP_POSITION]:
# 获取扩展后的卡片区域坐标
outer_x = max(0, x - GLOW_EXTENSION)
outer_y = max(0, y - GLOW_EXTENSION)
outer_w = w + (GLOW_EXTENSION * 2)
outer_h = h + (GLOW_EXTENSION * 2)
# 获取内外两个区域
outer_region = img[outer_y:y+h+GLOW_EXTENSION, outer_x:x+w+GLOW_EXTENSION]
inner_region = img[y:y+h, x:x+w]
# 创建掩码
outer_hsv = cv2.cvtColor(outer_region, cv2.COLOR_BGR2HSV)
inner_hsv = cv2.cvtColor(inner_region, cv2.COLOR_BGR2HSV)
# 计算外部区域的黄色部分
outer_mask = cv2.inRange(outer_hsv, YELLOW_LOWER, YELLOW_UPPER)
inner_mask = cv2.inRange(inner_hsv, YELLOW_LOWER, YELLOW_UPPER)
# 创建环形区域的掩码(仅计算扩展区域的荧光值)
ring_mask = outer_mask.copy()
ring_mask[GLOW_EXTENSION:GLOW_EXTENSION+h, GLOW_EXTENSION:GLOW_EXTENSION+w] = 0
# 计算环形区域的荧光值
glow_value = cv2.countNonZero(ring_mask)
card_glows.append((x, y, w, h, glow_value, return_value))
def handle_recommended_card(
card_count: int, timeout: float = 7,
threshold_predicate: Callable[[CardDetectResult], bool] = lambda _: True,
*,
img: MatLike | None = None,
):
# cd = Countdown(seconds=timeout)
# while not cd.expired():
# result = detect_recommended_card(card_count, threshold_predicate, img=img)
# if result is not None:
# device.double_click(result)
# return result
# sleep(np.random.uniform(0.01, 0.1))
# return None
# 找到荧光值最高的卡片
if not card_glows:
logger.debug("No glowing card found, retrying...")
continue
else:
max_glow_card = max(card_glows, key=lambda x: x[4])
x, y, w, h, glow_value, return_value = max_glow_card
if glow_value < GLOW_THRESHOLD:
logger.debug("Glow value is too low, retrying...")
continue
# 点击卡片中心
logger.debug(f"Click glowing card at: ({x + w//2}, {y + h//2})")
device.click(x + w//2, y + h//2)
sleep(random.uniform(0.5, 1.5))
device.click(x + w//2, y + h//2)
# 体力溢出提示框
# 跳过回合提示框 [screenshots/produce/in_produce/skip_turn_popup.png]
while image.wait_for(R.Common.ButtonIconCheckMark, timeout=1):
logger.info("Confirmation dialog detected")
device.click()
if return_value == 10:
logger.info("No enough AP. Skip this turn")
elif return_value == -1:
logger.warning("No glowing card found")
else:
logger.info("Recommended card is Card %d", return_value + 1)
return return_value
return -1
result = detect_recommended_card(card_count, threshold_predicate, img=img)
if result is not None:
device.double_click(result)
return result
return None
@action('获取当前卡片数量')
def skill_card_count():
@action('获取当前卡片数量', screenshot_mode='manual-inherit')
def skill_card_count(img: MatLike | None = None):
"""获取当前持有的技能卡数量"""
device.click(0, 0)
sleep(0.5)
img = device.screenshot()
img = use_screenshot(img)
img = crop(img, y1=0.83, y2=0.90)
count = image.raw().count(img, R.InPurodyuusu.A, threshold=0.85)
count += image.raw().count(img, R.InPurodyuusu.M, threshold=0.85)
count = image.raw().count(img, R.InPurodyuusu.A)
count += image.raw().count(img, R.InPurodyuusu.M)
logger.info("Current skill card count: %d", count)
return count
@action('获取剩余回合数和积分')
def remaing_turns_and_points():
"""获取剩余回合数和积分"""
ret = ocr.ocr()
logger.debug("ocr.ocr: %s", ret)
def index_of(text: str) -> int:
for i, item in enumerate(ret):
# CLEARまで -> CLEARまで
if text == unicodedata.normalize('NFKC', item.text):
return i
return -1
turns_tip_index = index_of("残りターン数")
points_tip_index = index_of("CLEARまで")
turns_rect = ret[turns_tip_index].rect
# 向下扩展100像素
turns_rect_extended = (
turns_rect[0], # x
turns_rect[1], # y
turns_rect[2], # width
turns_rect[3] + 100 # height + 100
)
# 裁剪并再次识别
turns_img = device.screenshot()[
turns_rect_extended[1]:turns_rect_extended[1]+turns_rect_extended[3],
turns_rect_extended[0]:turns_rect_extended[0]+turns_rect_extended[2]
]
turns_ocr = ocr.raw().ocr(turns_img)
logger.debug("turns_ocr: %s", turns_ocr)
Yield = TypeVar('Yield')
Send = TypeVar('Send')
Return = TypeVar('Return')
P = ParamSpec('P')
class GeneratorWrapper(Iterable[Yield], Generic[P, Yield, Send, Return]):
def __init__(
self,
generator_func: Callable[P, Generator[Yield, Send, Return]],
*args: P.args,
**kwargs: P.kwargs
):
self.generator_func = generator_func
self.generator = generator_func(*args, **kwargs)
self.args = args
self.kwargs = kwargs
def __iter__(self):
return self
def __call__(self):
return next(self.generator)
def reset(self):
self.generator = self.generator_func(*self.args, **self.kwargs)
def loop(self) -> Return:
while True:
try:
next(self.generator)
except StopIteration as e:
return cast(Return, e.value)
@action('获取当前卡牌信息', screenshot_mode='manual-inherit')
def obtain_cards(img: MatLike | None = None):
img = use_screenshot(img)
cards_rects = image.find_all_multi([
R.InPurodyuusu.A,
R.InPurodyuusu.M
])
logger.info("Current cards: %s", len(cards_rects))
cards = []
for result in cards_rects:
available = color.find_rgb('#7a7d7d', rect=result.rect) is None
cards.append(SkillCard(available=available, rect=result.rect))
return cards
@action('等待进入行动场景')
def until_action_scene():
"""等待进入行动场景"""
# 检测是否到行动页面
while not image.wait_for_any([
while not image.find_multi([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
], timeout=1):
]):
logger.info("Action scene not detected. Retry...")
acquisitions()
sleep(1)
until_acquisition_clear()
else:
logger.info("Now at action scene.")
return
@ -365,70 +367,138 @@ def until_action_scene():
@action('等待进入练习场景')
def until_practice_scene():
"""等待进入练习场景"""
while image.wait_for(R.InPurodyuusu.TextClearUntil, timeout=1) is None:
acquisitions()
sleep(1)
while image.find(R.InPurodyuusu.TextClearUntil) is None:
until_acquisition_clear()
@action('等待进入考试场景')
def until_exam_scene():
"""等待进入考试场景"""
while ocr.find(regex("合格条件|三位以上")) is None:
acquisitions()
sleep(1)
until_acquisition_clear()
@action('执行练习')
@action('执行练习', screenshot_mode='manual')
def practice():
"""执行练习"""
"""
执行练习
前置条件位于练习场景\n
结束状态各种奖励领取弹窗加载画面等
"""
logger.info("Practice started")
def threshold_predicate(result: CardDetectResult):
border_scores = (result.left_score, result.right_score, result.top_score, result.bottom_score)
return (
result.score >= 0.03
# and len(list(filter(lambda x: x >= 0.01, border_scores))) >= 3
)
# 循环打出推荐卡
while True:
with device.pinned():
count = skill_card_count()
if count == 0:
logger.info("No skill card found. Wait and retry...")
if not image.find_multi([
R.InPurodyuusu.TextPerfectUntil,
R.InPurodyuusu.TextClearUntil
]):
logger.info("PERFECTまで/CLEARまで not found. Practice finished.")
break
sleep(3)
continue
if click_recommended_card(card_count=count) == -1:
logger.info("Click recommended card failed. Retry...")
img = device.screenshot()
if image.find(R.Common.ButtonIconCheckMark):
logger.info("Confirmation dialog detected")
device.click()
sleep(3) # 等待卡片刷新
continue
logger.info("Wait for next turn...")
sleep(9)
# 跳过动画
logger.info("Recommend card not found. Practice finished.")
ocr.expect_wait(contains("上昇"))
logger.info("Click to finish 上昇 ")
device.click_center()
card_count = skill_card_count(img)
# cards = obtain_cards(img)
# card_count = len(cards)
# available_cards = [card for card in cards if card.available]
# if len(available_cards) == 1:
# device.double_click(available_cards[0].rect)
# sleep(3) # 等待卡片刷新
# continue
if card_count > 0 and handle_recommended_card(
card_count=card_count,
threshold_predicate=threshold_predicate,
img=img
) is not None:
sleep(3)
elif (
card_count == 0
and not image.find_multi([
R.InPurodyuusu.TextClearUntil,
R.InPurodyuusu.TextPerfectUntil
])
):
break
sleep(np.random.uniform(0.01, 0.2))
# 结束动画
logger.info("CLEAR/PERFECT not found. Practice finished.")
(SimpleDispatcher('practice.end')
.click(contains("上昇"), finish=True, log="Click to finish 上昇 ")
.click('center')
).run()
@action('执行考试')
def exam():
"""执行考试"""
logger.info("Exam started")
# 循环打出推荐卡
while True:
count = skill_card_count()
if count == 0:
logger.info("No skill card found. Wait and retry...")
if not image.wait_for(R.InPurodyuusu.TextButtonExamSkipTurn, timeout=20):
logger.info("Exam skip turn button not found. Exam finished.")
break
sleep(3)
continue
if click_recommended_card(card_count=count) == -1:
logger.info("Click recommended card failed. Retry...")
continue
logger.info("Wait for next turn...")
sleep(9)
def exam(type: Literal['mid', 'final']):
"""
执行考试
前置条件考试进行中场景手牌可见\n
结束状态考试结束交流/对话TODO截图
"""
logger.info("Exam started")
def threshold_predicate(result: CardDetectResult):
if type == 'final':
return (
result.score >= 0.4
and result.left_score >= 0.2
and result.right_score >= 0.2
and result.top_score >= 0.2
and result.bottom_score >= 0.2
)
else:
return result.score >= 0.10
# 关于上面阈值的解释:
# 两个阈值均指卡片周围的“黄色度”,
# total_threshold 指卡片平均的黄色度阈值border_thresholds 指卡片四边的黄色度阈值
# 为什么期中和期末考试阈值不一样:
# 期末考试的场景为黄昏,背景中含有大量黄色,
# 非常容易对推荐卡的检测造成干扰。
# 解决方法是提高平均阈值的同时,为每一边都设置阈值。
# 这样可以筛选出只有四边都包含黄色的发光卡片,
# 而由夕阳背景造成的假发光卡片通常不会四边都包含黄色。
while True:
img = device.screenshot()
if image.find(R.Common.ButtonIconCheckMark):
logger.info("Confirmation dialog detected")
device.click()
sleep(3) # 等待卡片刷新
continue
card_count = skill_card_count(img)
# cards = obtain_cards(img)
# card_count = len(cards)
# available_cards = [card for card in cards if card.available]
# if len(available_cards) == 1:
# device.double_click(available_cards[0].rect)
# sleep(3) # 等待卡片刷新
# continue
if card_count > 0 and handle_recommended_card(
card_count=card_count,
threshold_predicate=threshold_predicate,
img=img
) is not None:
sleep(3) # 等待卡片刷新
elif (
card_count == 0
and not ocr.find(contains('残りターン'), rect=R.InPurodyuusu.BoxExamTop)
):
break
sleep(np.random.uniform(0.01, 0.1))
# 点击“次へ”
device.click(image.expect_wait(R.Common.ButtonNext))
while ocr.wait_for(contains("メモリー"), timeout=7):
device.click_center()
if type == 'final':
while ocr.wait_for(contains("メモリー"), timeout=7):
device.click_center()
@action('考试结束流程')
def produce_end():
@ -537,6 +607,59 @@ def produce_end():
sleep(1)
logger.info("Produce completed.")
def week_normal():
until_action_scene()
executed_action = handle_recommended_action()
logger.info("Executed recommended action: %s", executed_action)
# 推荐练习
if executed_action == 'lesson':
until_practice_scene()
practice()
# 推荐休息
elif executed_action == 'rest':
pass
# 没有推荐行动
elif executed_action is None:
if allowance_available():
enter_allowance()
elif study_available():
enter_study()
elif is_rest_available():
rest()
else:
raise ValueError("No action available.")
until_action_scene()
def week_final_lesson():
if handle_recommended_action(final_week=True) != 'lesson':
raise ValueError("Failed to enter recommended action on final week.")
sleep(5)
until_practice_scene()
practice()
def week_mid_exam():
logger.info("Week mid exam started.")
logger.info("Wait for exam scene...")
until_exam_scene()
logger.info("Exam scene detected.")
sleep(5)
device.click_center()
sleep(5)
exam('mid')
until_action_scene()
def week_final_exam():
logger.info("Week final exam started.")
logger.info("Wait for exam scene...")
until_exam_scene()
logger.info("Exam scene detected.")
sleep(5)
device.click_center()
sleep(0.5)
loading.wait_loading_end()
exam('final')
produce_end()
@action('执行 Regular 培育')
def hajime_regular(week: int = -1, start_from: int = 1):
"""
@ -545,91 +668,7 @@ def hajime_regular(week: int = -1, start_from: int = 1):
:param week: 第几周从1开始-1表示全部
:param start_from: 从第几周开始从1开始
"""
def week_lesson():
until_action_scene()
executed_action = handle_recommended_action()
logger.info("Executed recommended action: %s", executed_action)
if executed_action == 'lesson':
sleep(5)
until_practice_scene()
practice()
elif executed_action == 'rest':
pass
elif executed_action is None:
rest()
until_action_scene()
def week_non_lesson():
"""非练习周。可能可用行动包括:おでかけ、相談、活動支給、授業"""
until_action_scene()
if handle_recommended_action() == 'rest':
logger.info("Recommended action is rest.")
elif allowance_available():
enter_allowance()
elif study_available():
enter_study()
elif is_rest_available():
rest()
else:
raise ValueError("No action available.")
until_action_scene()
def week_normal():
until_action_scene()
executed_action = handle_recommended_action()
logger.info("Executed recommended action: %s", executed_action)
# 推荐练习
if executed_action == 'lesson':
until_practice_scene()
practice()
# 推荐休息
elif executed_action == 'rest':
pass
# 没有推荐行动
elif executed_action is None:
if allowance_available():
enter_allowance()
elif study_available():
enter_study()
elif is_rest_available():
rest()
else:
raise ValueError("No action available.")
until_action_scene()
def week_final_lesson():
if handle_recommended_action(final_week=True) != 'lesson':
raise ValueError("Failed to enter recommended action on final week.")
sleep(5)
until_practice_scene()
practice()
# until_exam_scene()
def week_mid_exam():
logger.info("Week mid exam started.")
logger.info("Wait for exam scene...")
until_exam_scene()
logger.info("Exam scene detected.")
sleep(5)
device.click_center()
sleep(5)
exam()
until_action_scene()
def week_final_exam():
logger.info("Week final exam started.")
logger.info("Wait for exam scene...")
until_exam_scene()
logger.info("Exam scene detected.")
sleep(5)
device.click_center()
sleep(0.5)
loading.wait_loading_end()
exam()
produce_end()
weeks = [
# TODO: 似乎一部分选项是随机出现的
week_normal, # 1: Vo.レッスン、Da.レッスン、Vi.レッスン
week_normal, # 2: 授業
week_normal, # 3: Vo.レッスン、Da.レッスン、Vi.レッスン、授業
@ -656,6 +695,40 @@ def hajime_regular(week: int = -1, start_from: int = 1):
logger.info("Week %d started.", i + start_from)
w()
@action('执行 PRO 培育')
def hajime_pro(week: int = -1, start_from: int = 1):
"""
PRO 模式
:param week: 第几周从1开始-1表示全部
:param start_from: 从第几周开始从1开始
"""
weeks = [
week_normal, # 1
week_normal, # 2
week_normal, # 3
week_normal, # 4
week_normal, # 5
week_final_lesson, # 6
week_mid_exam, # 7
week_normal, # 8
week_normal, # 9
week_normal, # 10
week_normal, # 11
week_normal, # 12
week_normal, # 13
week_normal, # 14
week_final_lesson, # 15
week_final_exam, # 16
]
if week != -1:
logger.info("Week %d started.", week)
weeks[week - 1]()
else:
for i, w in enumerate(weeks[start_from-1:]):
logger.info("Week %d started.", i + start_from)
w()
@action('是否在考试场景')
def is_exam_scene():
"""是否在考试场景"""
@ -663,7 +736,7 @@ def is_exam_scene():
ProduceStage = Literal[
'action', # 行动场景
'practice', # 练习场景
'practice-ongoing', # 练习场景
'exam-start', # 考试开始确认页面
'exam-ongoing', # 考试进行中
'exam-end', # 考试结束
@ -681,26 +754,30 @@ def detect_regular_produce_scene(ctx: DispatcherContext) -> ProduceStage:
logger.info("Detecting current produce stage...")
# 行动场景
texts = ocr.ocr(rect=R.InPurodyuusu.BoxWeeksUntilExam)
texts = ocr.ocr()
if (
image.find_multi([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
])
and (texts.where(contains('')).first())
):
week = texts.squash().numbers()
if week:
logger.info("Detection result: At action scene. Current week: %d", week[0])
ctx.finish()
return 'action'
else:
return 'unknown'
logger.info("Detection result: At action scene.")
ctx.finish()
return 'action'
elif texts.where(regex('CLEARまで|PERFECTまで')):
logger.info("Detection result: At practice ongoing.")
ctx.finish()
return 'practice-ongoing'
elif is_exam_scene():
logger.info("Detection result: At exam scene.")
ctx.finish()
return 'exam-ongoing'
elif texts.where(regex('合格条件|三位以上')):
logger.info("Detection result: At exam start.")
ctx.finish()
return 'exam-start'
else:
until_acquisition_clear()
return 'unknown'
@action('开始 Regular 培育')
@ -711,7 +788,7 @@ def hajime_regular_from_stage(stage: ProduceStage):
if stage == 'action':
texts = ocr.ocr(rect=R.InPurodyuusu.BoxWeeksUntilExam)
# 提取周数
remaining_week = texts.squash().numbers()
remaining_week = texts.squash().replace('ó', '6').numbers()
if not remaining_week:
raise UnrecoverableError("Failed to detect week.")
# 判断阶段
@ -723,11 +800,26 @@ def hajime_regular_from_stage(stage: ProduceStage):
hajime_regular(start_from=week)
else:
raise UnrecoverableError("Failed to detect produce stage.")
elif stage == 'exam-start':
device.click_center()
until_exam_scene()
exam()
elif stage == 'exam-ongoing':
# TODO: 应该直接调用 week_final_exam 而不是再写一次
logger.info("Exam ongoing. Start exam.")
exam()
produce_end()
result = ocr.expect_wait(contains('中間|最終'))
if '中間' in result.text:
return hajime_regular_from_stage(detect_regular_produce_scene())
elif '最終' in result.text:
produce_end()
else:
raise UnrecoverableError("Failed to detect produce stage.")
elif stage == 'practice-ongoing':
# TODO: 应该直接调用 week_final_exam 而不是再写一次
logger.info("Practice ongoing. Start practice.")
practice()
return hajime_regular_from_stage(detect_regular_produce_scene())
else:
raise UnrecoverableError(f'Cannot resume produce REGULAR from stage "{stage}".')
@ -745,11 +837,26 @@ if __name__ == '__main__':
getLogger('kotonebot').setLevel(logging.DEBUG)
getLogger(__name__).setLevel(logging.DEBUG)
stage = (detect_regular_produce_scene())
hajime_regular_from_stage(stage)
# while True:
# cards = obtain_cards()
# print(cards)
# sleep(1)
# practice()
# week_final_exam()
exam('final')
produce_end()
# hajime_pro(start_from=15)
# exam('mid')
# stage = (detect_regular_produce_scene())
# hajime_regular_from_stage(stage)
# click_recommended_card(card_count=skill_card_count())
# exam()
# exam('mid')
# hajime_regular(start_from=7)

View File

@ -10,6 +10,7 @@ from .. import R
logger = getLogger(__name__)
@action('检测加载页面', screenshot_mode='manual')
def loading() -> bool:
"""检测是否在场景加载页面"""
img = device.screenshot()

View File

@ -46,13 +46,12 @@ def enter_study():
while not image.find(R.InPurodyuusu.IconTitleStudy):
logger.debug("Waiting for 授業 screen.")
check_and_skip_commu()
sleep(1)
acquisitions()
# 固定点击 Vi. 选项
logger.debug("Clicking on Vi. option.")
device.double_click(image.expect_wait(R.InPurodyuusu.ButtonIconStudyVisual))
while acquisitions() is None:
logger.info("Waiting for acquisitions finished.")
sleep(1)
logger.info("授業 completed.")
@ -81,7 +80,6 @@ def enter_allowance():
]))
while acquisitions() is None:
logger.info("Waiting for acquisitions finished.")
sleep(2)
# 第二个箱子
logger.info("Clicking on the second lootbox.")
device.click(image.expect_wait_any([
@ -89,7 +87,6 @@ def enter_allowance():
]))
while acquisitions() is None:
logger.info("Waiting for acquisitions finished.")
sleep(2)
logger.info("活動支給 completed.")
# wait_loading_start() # 可能会因为加载太快,截图没截到,导致抛出异常
sleep(1)

View File

@ -3,7 +3,8 @@ from typing import Callable
from .. import R
from .loading import loading
from kotonebot import device, image, action, cropped, UnrecoverableError, until, sleep
from kotonebot import device, image, action, cropped, until, sleep
from kotonebot.errors import UnrecoverableError
logger = logging.getLogger(__name__)

View File

@ -178,15 +178,15 @@ def do_produce(idol: PIdol | None = None):
sleep(0.2)
device.click(image.expect_wait(R.Produce.ButtonProduceStart))
sleep(0.5)
while not loading():
# 跳过交流设置 [screenshots/produce/skip_commu.png]
with device.pinned():
if image.find(R.Produce.RadioTextSkipCommu):
device.click()
sleep(0.2)
if image.find(R.Common.ButtonConfirmNoIcon):
device.click()
wait_loading_end()
# while not loading():
# # 跳过交流设置 [screenshots/produce/skip_commu.png]
# with device.pinned():
# if image.find(R.Produce.RadioTextSkipCommu):
# device.click()
# sleep(0.2)
# if image.find(R.Common.ButtonConfirmNoIcon):
# device.click()
# wait_loading_end()
hajime_regular()
@task('培育')

View File

@ -3,4 +3,5 @@ jinja2==3.1.5
pyinstaller==6.11.1
twine==6.1.0
dataclasses-json==0.6.7
python-lsp-server==1.12.0
python-lsp-server==1.12.0
snakeviz==2.2.2

Binary file not shown.

After

Width:  |  Height:  |  Size: 463 KiB

View File

@ -0,0 +1 @@
{"definitions":{"6b58d90d-2e5e-4b7f-bc01-941f2633de89":{"name":"InPurodyuusu.BoxSelectPStuff","displayName":"选择奖励对话框提示语","type":"hint-box","annotationId":"6b58d90d-2e5e-4b7f-bc01-941f2633de89","useHintRect":false},"b3c9e526-de47-44c2-9bda-e9a8a79fdb5a":{"name":"InPurodyuusu.BoxSelectPStuffComfirm","displayName":"选择奖励对话框 领取按钮","type":"hint-box","annotationId":"b3c9e526-de47-44c2-9bda-e9a8a79fdb5a","useHintRect":false}},"annotations":[{"id":"6b58d90d-2e5e-4b7f-bc01-941f2633de89","type":"rect","data":{"x1":62,"y1":558,"x2":655,"y2":716}},{"id":"b3c9e526-de47-44c2-9bda-e9a8a79fdb5a","type":"rect","data":{"x1":256,"y1":1064,"x2":478,"y2":1128}}]}

Binary file not shown.

After

Width:  |  Height:  |  Size: 724 KiB

View File

@ -0,0 +1 @@
{"definitions":{"6939bad1-8055-4fbe-97a4-59a3ce5be4e0":{"name":"InPurodyuusu.BoxSkillCardAcquired","displayName":"スキルカード獲得","type":"hint-box","annotationId":"6939bad1-8055-4fbe-97a4-59a3ce5be4e0","useHintRect":false}},"annotations":[{"id":"6939bad1-8055-4fbe-97a4-59a3ce5be4e0","type":"rect","data":{"x1":194,"y1":712,"x2":528,"y2":765}}]}

Binary file not shown.

After

Width:  |  Height:  |  Size: 662 KiB

View File

@ -0,0 +1 @@
{"definitions":{"6b58d90d-2e5e-4b7f-bc01-941f2633de89":{"name":"InPurodyuusu.BoxSkillCardEnhaced","displayName":"技能卡强化文本提示","type":"hint-box","annotationId":"6b58d90d-2e5e-4b7f-bc01-941f2633de89","useHintRect":false}},"annotations":[{"id":"6b58d90d-2e5e-4b7f-bc01-941f2633de89","type":"rect","data":{"x1":49,"y1":948,"x2":676,"y2":1106}}]}

Binary file not shown.

After

Width:  |  Height:  |  Size: 658 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 463 KiB