feat(*): 完善并优化 Regular 培育流程

1. 新增从主页到进入培育配置页面,到培育开始部分的逻辑处理
2. 培育行动页面加入对推荐休息情况的检测
3. 加入对培育过程中网络中断的检测
4. 培育中领取技能卡时自动识别卡片个数,而不是固定三张
5. 重命名模板匹配相关函数的名称,新增输入多个模板,输出多个结果的匹配函数
6. 新增对课程/考试过程中跳过回合的确认对话框的处理
7. 修复进入推荐行动时检测正确,但是选择了错误的行动
8. 课程/考试中检测当前卡片数量前,自动取消原先选择的卡片,避免检测数量有误
9. 自动检测并跳过培育中的未读交流
10. 新增检测支援卡的技能卡更换、技能卡强化画面
11. 调整加载画面检测的范围
12. 改进培育结束时自动流程
13. 修复行动页面点击休息按钮后没有自动等待进入下一行动周
This commit is contained in:
XcantloadX 2025-01-20 14:22:03 +08:00
parent 010731f3c6
commit 1dbf6506de
67 changed files with 956 additions and 280 deletions

View File

@ -1,7 +1,30 @@
# KotonesAutoAssistant 琴音小助手
## 功能
* 自动日常,包括
* 领取礼物(邮件)
* 领取活动费
* 领取工作奖励并自动重新安排工作
* 自动竞赛挑战
* 低配版自动培育(目前仅限 Ruglar 模式)
## 安装&使用
对模拟器的要求:
* 分辨率:目前必须是 1280x720
* 系统版本Android 10+QAPI 29这是游戏的要求
TODO
## 开发
> [!NOTE]
> 建议使用 VSCode 进行开发。
```bash
git clone https://github.com/XcantloadX/KotonesAutoAssistant.git
cd KotonesAutoAssistant
```
### 后端
```bash
python -m venv venv
source venv/bin/activate # Windows 系统: venv\Scripts\activate
pip install -r requirements.txt
@ -10,9 +33,23 @@ python tools/make_resources.py
```
然后打开 VSCode 设置搜索“SupportRestructured Text”并勾选。
### 前端
```bash
cd kotonebot-ui
npm install
npm run dev
```
### 截图
建议使用 [XnView MP](https://www.xnview.com/en/xnviewmp/) 进行截图裁剪工作。
XnView MP 可以方便的完成“打开图片 → 选区 → 另存选取为文件”这一操作。
XnView MP 可以方便的完成“打开图片 → 选区 → 裁剪图片 → 另存选取为文件”这一操作。
只需要提前设置好右键菜单:
![XnView MP 设置1](./images/xnview_setup1.png)
## 清单
- [ ] 提高课程/考试中检测推荐卡的准确率
- [ ] 微调 OCR 模型。目前 OCR 识别结果不太准确
- [ ] 支持多分辨率
- [ ] 尝试支持汉化版
- [ ] 截图AI 辅助自动裁剪 + 命名文件

View File

@ -0,0 +1,140 @@
from transformers import AutoModelForCausalLM, AutoTokenizer
import gradio as gr
from PIL import Image
import torch
import numpy as np
from PIL import ImageDraw
# 加载模型
model_id = "vikhyatk/moondream2"
revision = "2025-01-09" # Pin to specific version
def load_model():
model = AutoModelForCausalLM.from_pretrained(
model_id, trust_remote_code=True, revision=revision,
torch_dtype=torch.float16, device_map={"": "cuda"}
).to("cuda")
tokenizer = AutoTokenizer.from_pretrained(model_id, revision=revision)
return model, tokenizer
model, tokenizer = load_model()
print('Model loaded successfully')
def answer_question(image, question):
enc_image = model.encode_image(image)
answer = model.answer_question(enc_image, question, tokenizer)
return answer
def generate_caption(image, length="normal"):
result = model.caption(image, length=length)
return result["caption"]
def visual_query(image, query):
result = model.query(image, query)
return result["answer"]
def detect_objects(image, object_type):
# 获取检测结果
result = model.detect(image, object_type)
objects = result["objects"]
print("Detection result:", result) # 添加调试信息
# 在图像上标注检测结果
img_draw = image.copy()
draw = ImageDraw.Draw(img_draw)
width, height = image.size
# 为每个检测到的对象画框
for obj in objects:
# 将相对坐标转换为像素坐标
x_min = int(float(obj['x_min']) * width)
y_min = int(float(obj['y_min']) * height)
x_max = int(float(obj['x_max']) * width)
y_max = int(float(obj['y_max']) * height)
box = [x_min, y_min, x_max, y_max]
print(f"Object box (pixels): {box}") # 添加每个对象的框信息
draw.rectangle(box, outline="red", width=3)
# 返回详细的检测结果
result_text = f"Found {len(objects)} {object_type}(s)\n"
for i, obj in enumerate(objects, 1):
result_text += f"Object {i}: x={obj['x_min']:.3f}-{obj['x_max']:.3f}, y={obj['y_min']:.3f}-{obj['y_max']:.3f}\n"
return result_text, img_draw
def point_objects(image, object_type):
# 获取点定位结果
result = model.point(image, object_type)
points = result["points"]
print("Points result:", points) # 添加调试信息
# 在图像上标注点
img_draw = image.copy()
draw = ImageDraw.Draw(img_draw)
width, height = image.size
# 为每个点画圆
result_text = f"Found {len(points)} {object_type}(s)\n"
for i, point in enumerate(points, 1):
if isinstance(point, dict) and 'x' in point and 'y' in point:
# 坐标是相对值0-1需要转换为实际像素坐标
x = int(float(point['x']) * width)
y = int(float(point['y']) * height)
result_text += f"Point {i}: x={point['x']:.3f}, y={point['y']:.3f}\n"
radius = 10
draw.ellipse([x-radius, y-radius, x+radius, y+radius], fill="red")
else:
print(f"Unexpected point format: {point}")
result_text += f"Point {i}: Invalid format\n"
return result_text, img_draw
# 创建 Gradio 界面
with gr.Blocks() as demo:
gr.Markdown("# Moondream2 视觉分析演示")
with gr.Tab("问答"):
with gr.Row():
image_input1 = gr.Image(type="pil", label="上传图片")
question_input = gr.Textbox(label="输入问题")
answer_output = gr.Textbox(label="回答")
answer_button = gr.Button("获取回答")
answer_button.click(answer_question, inputs=[image_input1, question_input], outputs=answer_output)
with gr.Tab("图像描述"):
with gr.Row():
image_input2 = gr.Image(type="pil", label="上传图片")
length_input = gr.Radio(["short", "normal"], label="描述长度", value="normal")
caption_output = gr.Textbox(label="描述结果")
caption_button = gr.Button("生成描述")
caption_button.click(generate_caption, inputs=[image_input2, length_input], outputs=caption_output)
with gr.Tab("视觉查询"):
with gr.Row():
image_input3 = gr.Image(type="pil", label="上传图片")
query_input = gr.Textbox(label="输入查询")
query_output = gr.Textbox(label="查询结果")
query_button = gr.Button("执行查询")
query_button.click(visual_query, inputs=[image_input3, query_input], outputs=query_output)
with gr.Tab("物体检测"):
with gr.Row():
image_input4 = gr.Image(type="pil", label="上传图片")
object_input = gr.Textbox(label="输入要检测的物体类型")
with gr.Row():
detect_text_output = gr.Textbox(label="检测结果")
detect_image_output = gr.Image(type="pil", label="可视化结果")
detect_button = gr.Button("开始检测")
detect_button.click(detect_objects, inputs=[image_input4, object_input], outputs=[detect_text_output, detect_image_output])
with gr.Tab("点定位"):
with gr.Row():
image_input5 = gr.Image(type="pil", label="上传图片")
point_object_input = gr.Textbox(label="输入要定位的物体类型")
with gr.Row():
point_text_output = gr.Textbox(label="定位结果")
point_image_output = gr.Image(type="pil", label="可视化结果")
point_button = gr.Button("开始定位")
point_button.click(point_objects, inputs=[image_input5, point_object_input], outputs=[point_text_output, point_image_output])
demo.launch(share=True)

View File

@ -24,7 +24,13 @@ from .backend.util import (
cropped,
UnrecoverableError,
AdaptiveWait,
until,
)
from .backend.color import (
hsv_cv2web,
hsv_web2cv,
rgb_to_hsv,
hsv_to_rgb
)
from .backend.core import task, action
from .ui import user

View File

@ -1,4 +1,4 @@
from os import name
import colorsys
from typing import Literal
import numpy as np
@ -13,16 +13,61 @@ RgbColorStr = str
RgbColor = RgbColorTuple | RgbColorStr
"""颜色。三元组 `(r, g, b)` 或十六进制颜色字符串 `#RRGGBB`"""
HsbColor = tuple[int, int, int]
HsvColor = tuple[int, int, int]
"""
HSB颜色三元组 `(h, s, b)`
h: 0-180
s: 0-255
b: 0-255
HSV颜色三元组 `(h, s, v)`
"""
def _unify_color(color: RgbColor) -> RgbColor:
def hsv_web2cv(h: int, s: int, v: int) -> 'HsvColor':
"""
HSV 颜色从 Web 格式转换为 OpenCV 格式
:param h: 色相范围 [0, 360]
:param s: 饱和度范围 [0, 100]
:param v: 亮度范围 [0, 100]
:return: OpenCV 格式 HSV 颜色三元组 `(h, s, v)`范围分别为 (0-180, 0-255, 0-255)
"""
h = round(h / 2) # web 的色相范围是 0-360转为 0-180
s = round(s / 100 * 255) # web 的饱和度范围是 0-100转为 0-255
v = round(v / 100 * 255) # web 的亮度范围是 0-100转为 0-255
return (h, s, v)
def hsv_cv2web(h: int, s: int, v: int) -> 'HsvColor':
"""
HSV 颜色从 OpenCV 格式转换为 Web 格式
:param h: 色相范围 [0, 180]
:param s: 饱和度范围 [0, 255]
:param v: 亮度范围 [0, 255]
:return: Web 格式 HSV 颜色三元组 `(h, s, v)`范围分别为 (0-360, 0-100, 0-100)
"""
h = round(h * 2) # opencv 的色相范围是 0-180转为 0-360
s = round(s / 255 * 100) # opencv 的饱和度范围是 0-255转为 0-100
v = round(v / 255 * 100) # opencv 的亮度范围是 0-255转为 0-100
return (h, s, v)
def rgb_to_hsv(c: RgbColor) -> 'HsvColor':
"""
RGB 颜色转换为 HSV 颜色
:param c: RGB 颜色十六进制颜色字符串 `#RRGGBB` 或整数三元组 `(r, g, b)`。
:return: Web 格式 HSV 颜色三元组 `(h, s, v)`范围分别为 (0-360, 0-100, 0-100)
"""
c = _unify_color(c)
ret = colorsys.rgb_to_hsv(c[0] / 255, c[1] / 255, c[2] / 255)
return (round(ret[0] * 360), round(ret[1] * 100), round(ret[2] * 100))
def hsv_to_rgb(c: HsvColor) -> 'RgbColor':
"""
HSV 颜色转换为 RGB 颜色
:param c: Web 格式 HSV 颜色三元组 `(h, s, v)`范围分别为 (0-360, 0-100, 0-100)
:return: RGB 颜色整数三元组 `(r, g, b)`
"""
ret = colorsys.hsv_to_rgb(c[0] / 360, c[1] / 100, c[2] / 100)
return (round(ret[0] * 255), round(ret[1] * 255), round(ret[2] * 255))
def _unify_color(color: RgbColor) -> RgbColorTuple:
if isinstance(color, str):
if not color.startswith('#'):
raise ValueError('Hex color string must start with #')
@ -48,17 +93,17 @@ def _unify_image(image: MatLike | str) -> MatLike:
image = cv2.imread(image)
return image
def in_range(color: HsbColor, range: tuple[HsbColor, HsbColor]) -> bool:
def in_range(color: RgbColor, range: tuple[HsvColor, HsvColor]) -> bool:
"""
判断颜色是否在范围内
:param color: 颜色
:param range: 范围
:param color: RGB 颜色
:param range: Web HSV 颜色范围
"""
h, s, b = color
h1, s1, b1 = range[0]
h2, s2, b2 = range[1]
return h1 <= h <= h2 and s1 <= s <= s2 and b1 <= b <= b2
h, s, v = rgb_to_hsv(color)
h1, s1, v1 = range[0]
h2, s2, v2 = range[1]
return h1 <= h <= h2 and s1 <= s <= s2 and v1 <= v <= v2
def find_rgb(
image: MatLike | str,
@ -211,6 +256,19 @@ def dominant_color(
hex_color = f'#{rgb[0]:02x}{rgb[1]:02x}{rgb[2]:02x}'
result.append(hex_color)
if debug.enabled:
origin_image = _unify_image(image)
result_image = origin_image.copy()
if rect is not None:
x, y, w, h = rect
cv2.rectangle(result_image, (x, y), (x + w, y + h), (0, 0, 255), 2)
debug_result(
'color.dominant_color',
[result_image, origin_image],
f'arguments:\n \tcount={count}\n \trect={rect}\n'
f'result={", ".join(map(debug_color, result))}'
)
return result
if __name__ == '__main__':

View File

@ -24,12 +24,14 @@ from kotonebot.backend.image import (
CropResult,
TemplateMatchResult,
MultipleTemplateMatchResult,
find_crop,
find_all_crop,
expect,
find,
find_any,
find_many,
find_multi,
find_all,
find_all_multi,
)
import kotonebot.backend.color as raw_color
from kotonebot.backend.color import find_rgb
from kotonebot.backend.ocr import Ocr, OcrResult, jp, en, StringMatchFunction
@ -215,7 +217,7 @@ class ContextImage:
self,
template: MatLike | str,
mask: MatLike | str | None = None,
threshold: float = 0.9,
threshold: float = 0.8,
timeout: float = DEFAULT_TIMEOUT,
colored: bool = False,
*,
@ -239,7 +241,7 @@ class ContextImage:
self,
templates: list[str],
masks: list[str | None] | None = None,
threshold: float = 0.9,
threshold: float = 0.8,
timeout: float = DEFAULT_TIMEOUT,
colored: bool = False,
*,
@ -266,7 +268,7 @@ class ContextImage:
self,
template: str,
mask: str | None = None,
threshold: float = 0.9,
threshold: float = 0.8,
timeout: float = DEFAULT_TIMEOUT,
colored: bool = False,
*,
@ -290,7 +292,7 @@ class ContextImage:
self,
templates: list[str],
masks: list[str | None] | None = None,
threshold: float = 0.9,
threshold: float = 0.8,
timeout: float = DEFAULT_TIMEOUT,
colored: bool = False,
*,
@ -327,19 +329,23 @@ class ContextImage:
self.context.device.last_find = ret
return ret
@context(find_many)
def find_many(self, *args, **kwargs):
return find_many(self.context.device.screenshot(), *args, **kwargs)
@context(find_all)
def find_all(self, *args, **kwargs):
return find_all(self.context.device.screenshot(), *args, **kwargs)
@context(find_any)
def find_any(self, *args, **kwargs):
ret = find_any(self.context.device.screenshot(), *args, **kwargs)
@context(find_multi)
def find_multi(self, *args, **kwargs):
ret = find_multi(self.context.device.screenshot(), *args, **kwargs)
self.context.device.last_find = ret
return ret
@context(find_crop)
def find_crop_many(self, *args, **kwargs):
return find_crop(self.context.device.screenshot(), *args, **kwargs)
@context(find_all_multi)
def find_all_multi(self, *args, **kwargs):
return find_all_multi(self.context.device.screenshot(), *args, **kwargs)
@context(find_all_crop)
def find_all_crop(self, *args, **kwargs):
return find_all_crop(self.context.device.screenshot(), *args, **kwargs)
class ContextGlobalVars:
@ -372,6 +378,9 @@ class ContextColor:
def __init__(self, context: 'Context'):
self.context = context
def raw(self):
return raw_color
@context(find_rgb)
def find_rgb(self, *args, **kwargs):
return find_rgb(self.context.device.screenshot(), *args, **kwargs)
@ -401,7 +410,9 @@ class Context:
# HACK: 暂时写死
from adbutils import adb
adb.connect('127.0.0.1:5555')
self.__device = AdbDevice(adb.device_list()[0])
adb.connect('127.0.0.1:16384')
d = [d for d in adb.device_list() if d.serial == '127.0.0.1:5555']
self.__device = AdbDevice(d[0])
# self.__device = None
self.__ocr = ContextOcr(self)
self.__image = ContextImage(self)

View File

@ -33,3 +33,17 @@ export function loadComponent(componentPath) {
console.error(`加载组件 ${componentName} 失败:`, error);
});
}
/**
* 防抖函数
* @param {Function} func 要防抖的函数
* @param {number} wait 等待时间毫秒
* @returns {Function} 防抖后的函数
*/
export function debounce(func, wait) {
let timeout;
return function(...args) {
clearTimeout(timeout);
timeout = setTimeout(() => func.apply(this, args), wait);
};
}

View File

@ -62,6 +62,15 @@ class MultipleTemplateMatchResult(NamedTuple):
"""结果右下角坐标。"""
return (self.position[0] + self.size[0], self.position[1] + self.size[1])
@classmethod
def from_template_match_result(cls, result: TemplateMatchResult, index: int):
return cls(
score=result.score,
position=result.position,
size=result.size,
index=index
)
class CropResult(NamedTuple):
score: float
position: Point
@ -271,7 +280,7 @@ def hist_match(
avg_score = total_score / 3
return avg_score >= threshold
def find_crop(
def find_all_crop(
image: MatLike | str,
template: MatLike | str,
mask: MatLike | str | None = None,
@ -282,7 +291,7 @@ def find_crop(
remove_duplicate: bool = True,
) -> list[CropResult]:
"""
指定一个模板寻找其出现的所有位置并裁剪出结果
指定一个模板在输入图像中寻找其出现的所有位置并裁剪出结果
:param image: 图像可以是图像路径或 cv2.Mat
:param template: 模板图像可以是图像路径或 cv2.Mat
@ -321,7 +330,7 @@ def find(
remove_duplicate: bool = True,
) -> TemplateMatchResult | None:
"""
指定一个模板寻找其出现的第一个位置
指定一个模板在输入图像中寻找其出现的第一个位置
:param image: 图像可以是图像路径或 cv2.Mat
:param template: 模板图像可以是图像路径或 cv2.Mat
@ -356,7 +365,7 @@ def find(
)
return matches[0] if len(matches) > 0 else None
def find_many(
def find_all(
image: MatLike,
template: MatLike | str,
mask: MatLike | str | None = None,
@ -365,9 +374,10 @@ def find_many(
threshold: float = 0.8,
remove_duplicate: bool = True,
colored: bool = False,
debug_output: bool = True,
) -> list[TemplateMatchResult]:
"""
指定一个模板寻找所有出现的位置
指定一个模板在输入图像中寻找其出现的所有位置
:param image: 图像可以是图像路径或 cv2.Mat
:param template: 模板图像可以是图像路径或 cv2.Mat
@ -387,17 +397,17 @@ def find_many(
remove_duplicate=remove_duplicate,
colored=colored,
)
if debug.enabled:
if debug.enabled and debug_output:
result_image = _draw_result(image, results)
debug_result(
'image.find_many',
'image.find_all',
[result_image, image],
f"template: {img(template)} \n"
f"matches: {len(results)} \n"
)
return results
def find_any(
def find_multi(
image: MatLike,
templates: list[MatLike | str],
masks: list[MatLike | str | None] | None = None,
@ -408,7 +418,7 @@ def find_any(
remove_duplicate: bool = True,
) -> MultipleTemplateMatchResult | None:
"""
指定多个模板返回第一个匹配到的结果
指定多个模板在输入图像中逐个寻找模板返回第一个匹配到的结果
:param image: 图像可以是图像路径或 cv2.Mat
:param templates: 模板图像列表可以是图像路径或 cv2.Mat
@ -454,19 +464,99 @@ def find_any(
"</table>\n"
)
debug_result(
'image.find_any',
'image.find_multi',
[_draw_result(image, ret), image],
msg
)
return ret
def find_all_multi(
image: MatLike,
templates: list[MatLike | str],
masks: list[MatLike | str | None] | None = None,
*,
transparent: bool = False,
threshold: float = 0.8,
colored: bool = False,
remove_duplicate: bool = True,
) -> list[MultipleTemplateMatchResult]:
"""
指定多个模板在输入图像中逐个寻找模板返回所有匹配到的结果
此函数等价于
```python
result = []
for template in templates:
result.append(find_all(template, ...))
```
:param image: 图像可以是图像路径或 cv2.Mat
:param templates: 模板图像列表可以是图像路径或 cv2.Mat
:param masks: 掩码图像列表可以是图像路径或 cv2.Mat
:param transparent: 若为 True则认为输入模板是透明的并自动将透明模板转换为 Mask 图像
:param threshold: 阈值默认为 0.8
:param colored: 是否匹配颜色默认为 False
:param remove_duplicate: 是否移除重复结果默认为 True
:return: 匹配到的一维结果列表
"""
ret: list[MultipleTemplateMatchResult] = []
if masks is None:
_masks = [None] * len(templates)
else:
_masks = masks
for index, (template, mask) in enumerate(zip(templates, _masks)):
results = find_all(
image,
template,
mask,
transparent=transparent,
threshold=threshold,
colored=colored,
remove_duplicate=remove_duplicate,
debug_output=False,
)
ret.extend([
MultipleTemplateMatchResult.from_template_match_result(r, index)
for r in results
])
if debug.enabled:
# 参数表格
msg = (
"<center>Templates</center>"
"<table class='result-table'>"
"<tr><th>Template</th><th>Mask</th></tr>"
)
for t, m in zip(templates, _masks):
msg += f"<tr><td>{img(t)}</td><td>{img(m)}</td></tr>"
msg += "</table>"
msg += "<br>"
# 结果表格
msg += (
"<center>Results</center>"
"<table class='result-table'>"
"<tr><th>Template</th><th>Mask</th><th>Result</th></tr>"
)
for result in ret:
template = templates[result.index]
mask = _masks[result.index]
msg += f"<tr><td>{img(template)}</td><td>{img(mask)}</td><td>{result.position}</td></tr>"
msg += "</table>"
debug_result(
'image.find_all_multi',
[_draw_result(image, ret), image],
msg
)
return ret
def count(
image: MatLike,
template: MatLike | str,
mask: MatLike | str | None = None,
*,
transparent: bool = False,
threshold: float = 0.9,
threshold: float = 0.8,
remove_duplicate: bool = True,
colored: bool = False,
) -> int:
@ -512,7 +602,7 @@ def expect(
mask: MatLike | str | None = None,
*,
transparent: bool = False,
threshold: float = 0.9,
threshold: float = 0.8,
colored: bool = False,
remove_duplicate: bool = True,
) -> TemplateMatchResult:

View File

@ -11,6 +11,7 @@ from thefuzz import fuzz as _fuzz
if TYPE_CHECKING:
from kotonebot.client.protocol import DeviceABC
from kotonebot.backend.color import HsvColor
class TaskInfo(NamedTuple):
name: str
@ -172,6 +173,29 @@ def grayscaled(img: MatLike | str) -> MatLike:
def grayscale_cached(img: MatLike | str) -> MatLike:
return grayscaled(img)
def until(
condition: Callable[[], bool],
timeout: float=60,
interval: float=0.5,
critical: bool=False
) -> bool:
"""
等待条件成立如果条件不成立则返回 False 或抛出异常
:param condition: 条件函数
:param timeout: 等待时间单位为秒
:param interval: 检查条件的时间间隔单位为秒
:param critical: 如果条件不成立是否抛出异常
"""
start = time.time()
while not condition():
if time.time() - start > timeout:
if critical:
raise TimeoutError(f"Timeout while waiting for condition {condition.__name__}.")
return False
time.sleep(interval)
return True
class AdaptiveWait:
"""

View File

@ -11,6 +11,8 @@ from kotonebot import (
grayscale_cached,
action
)
from kotonebot.tasks.actions.commu import check_and_skip_commu
from .loading import loading, wait_loading_end
from .. import R
from .pdorinku import acquire_pdorinku
@ -21,61 +23,65 @@ def acquire_skill_card():
"""获取技能卡(スキルカード)"""
# TODO: 识别卡片内容,而不是固定选卡
# TODO: 不硬编码坐标
CARD_POSITIONS = [
(157, 820, 128, 128),
(296, 820, 128, 128),
(435, 820, 128, 128),
]
logger.info("Click first skill card")
device.click(CARD_POSITIONS[0])
logger.debug("Locating all skill cards...")
cards = image.find_all_multi([
R.InPurodyuusu.A,
R.InPurodyuusu.M
])
cards = sorted(cards, key=lambda x: (x.position[0], x.position[1]))
logger.info(f"Found {len(cards)} skill cards")
logger.debug("Click first skill card")
device.click(cards[0].rect)
sleep(0.5)
# 确定
logger.info("Click 受け取る")
logger.debug("Click 受け取る")
device.click(ocr.expect(contains("受け取る")).rect)
# 跳过动画
device.click(image.expect_wait_any([
R.InPurodyuusu.PSkillCardIconBlue,
R.InPurodyuusu.PSkillCardIconColorful
]))
], timeout=60))
logger.info("Skill card #1 acquired")
AcquisitionType = Literal[
"PDrinkAcquire", # P饮料被动领取
"PDrinkSelect", # P饮料主动领取
"PDrinkMax", # P饮料到达上限
"PSkillCardAcquire", # 技能卡领取
"PSkillCardChange", # 技能卡更换
"PSkillCardSelect", # 技能卡选择
"PSkillCardEnhance", # 技能卡强化
"PItem", # P物品
"Clear", # 目标达成
"NetworkError", # 网络中断弹窗
"SkipCommu", # 跳过交流
]
@action('检测并领取奖励')
# TODO: 这个函数可能要换个更好的名字
def acquisitions() -> AcquisitionType | None:
"""处理行动开始前和结束后可能需要处理的事件,直到到行动页面为止"""
img = device.screenshot_raw()
gray_img = grayscaled(img)
ocr_results = ocr.raw().ocr(img)
ocr_text = ''.join(r.text for r in ocr_results)
logger.info("Acquisition stuffs...")
# P饮料被动领取
logger.info("Check PDrink acquisition...")
if image.raw().find(img, R.InPurodyuusu.PDrinkIcon):
logger.info("Click to finish animation")
logger.info("PDrink acquire found")
device.click_center()
sleep(1)
return "PDrinkAcquire"
# P饮料主动领取
# if ocr.raw().find(img, contains("受け取るPドリンクを選れでください")):
if image.raw().find(img, R.InPurodyuusu.TextPleaseSelectPDrink):
logger.info("PDrink acquisition")
# 不领取
# device.click(ocr.expect(contains("受け取らない")))
# sleep(0.5)
# device.click(image.expect(R.InPurodyuusu.ButtonNotAcquire))
# sleep(0.5)
# device.click(image.expect(R.InPurodyuusu.ButtonConfirm))
logger.info("PDrink select found")
acquire_pdorinku(index=0)
return "PDrinkSelect"
# P饮料到达上限
if image.raw().find(img, R.InPurodyuusu.TextPDrinkMax):
logger.info("PDrink max found")
device.click(image.expect(R.InPurodyuusu.ButtonLeave))
sleep(0.7)
# 可能需要点击确认
@ -83,16 +89,28 @@ def acquisitions() -> AcquisitionType | None:
return "PDrinkMax"
# 技能卡被动领取(支援卡效果)
logger.info("Check skill card acquisition...")
if image.raw().find_any(img, [
if image.raw().find_multi(img, [
R.InPurodyuusu.PSkillCardIconBlue,
R.InPurodyuusu.PSkillCardIconColorful
]):
logger.info("Acquire skill card")
logger.info("Acquire skill card found")
device.click_center()
return "PSkillCardAcquire"
# 技能卡更换(支援卡效果)
# [screenshots/produce/in_produce/support_card_change.png]
if 'チェンジ' in ocr_text:
logger.info("Change skill card found")
device.click_center()
return "PSkillCardChange"
# 技能卡强化
# [screenshots/produce/in_produce/skill_card_enhance.png]
if '強化' in ocr_text:
logger.info("Enhance skill card found")
device.click_center()
return "PSkillCardEnhance"
# 技能卡选择
if ocr.raw().find(img, contains("受け取るスキルカードを選んでください")):
logger.info("Acquire skill card")
if '受け取るスキルカードを選んでください' in ocr_text:
logger.info("Acquire skill card found")
acquire_skill_card()
sleep(5)
return "PSkillCardSelect"
@ -104,6 +122,7 @@ def acquisitions() -> AcquisitionType | None:
return acquisitions()
# 目标达成
if image.raw().find(gray_img, grayscale_cached(R.InPurodyuusu.IconClearBlue)):
logger.info("Clear found")
logger.debug("達成: clicked")
device.click_center()
sleep(5)
@ -117,10 +136,22 @@ def acquisitions() -> AcquisitionType | None:
device.click_center()
sleep(1)
return "PItem"
# 网络中断弹窗
if image.raw().find(img, R.Common.TextNetworkError):
logger.info("Network error popup found")
device.click(image.expect(R.Common.ButtonRetry))
return "NetworkError"
# 加载画面
if loading():
logger.info("Loading screen found")
wait_loading_end()
# 支援卡
# logger.info("Check support card acquisition...")
# 记忆
# 未跳过剧情
# 跳过未读交流
if check_and_skip_commu(img):
return "SkipCommu"
# TODO: 在这里加入定时点击以避免再某个地方卡住
return None
if __name__ == '__main__':
@ -131,4 +162,3 @@ if __name__ == '__main__':
getLogger('kotonebot').setLevel(logging.DEBUG)
getLogger(__name__).setLevel(logging.DEBUG)
init_context()
acquisitions()

View File

@ -0,0 +1,73 @@
"""检测与跳过交流"""
import logging
from time import sleep
from cv2.typing import MatLike
from .. import R
from kotonebot import device, image, color, user, rect_expand, until, action
logger = logging.getLogger(__name__)
@action('检查是否处于交流')
def is_at_commu():
return image.find(R.Common.ButtonCommuFastforward) is not None
@action('跳过交流')
def skip_commu():
device.click(image.expect_wait(R.Common.ButtonCommuSkip))
@action('检查并跳过交流')
def check_and_skip_commu(img: MatLike | None = None) -> bool:
"""
检查当前是否处在未读交流并自动跳过
:param img: 截图
:return: 是否跳过了交流
"""
ret = False
logger.info('Check and skip commu')
if img is None:
img = device.screenshot()
skip_btn = image.find(R.Common.ButtonCommuFastforward)
if skip_btn is None:
logger.info('No fast forward button found. Not at a commu.')
return ret
ret = True
logger.debug('Fast forward button found. Check commu')
button_bg_rect = rect_expand(skip_btn.rect, 10, 10, 50, 10)
colors = color.raw().dominant_color(img, 2, rect=button_bg_rect)
RANGE = ((20, 65, 95), (180, 100, 100))
if not any(color.raw().in_range(c, RANGE) for c in colors):
user.info('发现未读交流', [img])
logger.debug('Not fast forwarding. Click fast forward button')
device.click(skip_btn)
sleep(0.7)
if image.find(R.Common.ButtonConfirm):
logger.debug('Click confirm button')
device.click()
else:
logger.info('Fast forwarding. No action needed.')
logger.debug('Wait until not at commu')
until(lambda: not is_at_commu(), interval=1)
logger.info('Fast forward done')
return ret
if __name__ == '__main__':
from kotonebot.backend.context import init_context
import logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')
logger.setLevel(logging.DEBUG)
init_context()
print(is_at_commu())
# rect = image.expect(R.Common.ButtonCommuFastforward).rect
# print(rect)
# rect = rect_expand(rect, 10, 10, 50, 10)
# print(rect)
# img = device.screenshot()
# print(color.raw().dominant_color(img, 2, rect=rect))
# skip_commu()
# check_and_skip_commu()

View File

@ -1,19 +1,19 @@
import random
import time
import cv2
import logging
import unicodedata
from time import sleep
from typing import Literal
from typing_extensions import deprecated
import cv2
import unicodedata
import logging
from time import sleep
from .. import R
from .scenes import at_home
from . import loading
from .common import acquisitions
from kotonebot.backend.util import AdaptiveWait, crop_y, cropper_y
from .non_lesson_actions import enter_allowance, allowance_available
from kotonebot import ocr, device, contains, image, regex, action, debug
from kotonebot.backend.context import init_context
from kotonebot.backend.util import crop_y, cropper_y
from kotonebot.tasks import R
from kotonebot.tasks.actions import loading
from .non_lesson_actions import enter_allowance, study_available, enter_study, allowance_available
from .common import acquisitions, AcquisitionType, acquire_skill_card
logger = logging.getLogger(__name__)
@ -75,11 +75,11 @@ def enter_recommended_action(final_week: bool = False) -> ActionType:
# 获取课程
logger.debug("Getting recommended lesson...")
with device.hook(cropper_y(0.00, 0.30)):
result = image.find_any([
result = image.find_multi([
R.InPurodyuusu.TextSenseiTipDance,
R.InPurodyuusu.TextSenseiTipVocal,
R.InPurodyuusu.TextSenseiTipVisual,
# R.InPurodyuusu.TextSenseiTipRest, # TODO: 体力提示截图
R.InPurodyuusu.TextSenseiTipRest,
])
logger.debug("ocr.wait_for: %s", result)
if result is None:
@ -87,9 +87,9 @@ def enter_recommended_action(final_week: bool = False) -> ActionType:
return None
if not final_week:
if result.index == 0:
lesson_text = "Vo"
elif result.index == 1:
lesson_text = "Da"
elif result.index == 1:
lesson_text = "Vo"
elif result.index == 2:
lesson_text = "Vi"
elif result.index == 3:
@ -105,9 +105,9 @@ def enter_recommended_action(final_week: bool = False) -> ActionType:
return 'lesson'
else:
if result.index == 0:
template = R.InPurodyuusu.ButtonFinalPracticeVocal
elif result.index == 1:
template = R.InPurodyuusu.ButtonFinalPracticeDance
elif result.index == 1:
template = R.InPurodyuusu.ButtonFinalPracticeVocal
elif result.index == 2:
template = R.InPurodyuusu.ButtonFinalPracticeVisual
else:
@ -121,6 +121,7 @@ def before_start_action():
raise NotImplementedError()
@action('打出推荐卡')
# TODO: 这里面的结果也加入 debug 显示
def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
"""点击推荐卡片
@ -279,10 +280,10 @@ def click_recommended_card(timeout: float = 7, card_count: int = 3) -> int:
sleep(random.uniform(0.5, 1.5))
device.click(x + w//2, y + h//2)
# 体力溢出提示框
ret = image.wait_for(R.Common.ButtonConfirm, timeout=1)
if ret is not None:
logger.info("Skill card confirmation dialog detected")
device.click(ret)
# 跳过回合提示框 [screenshots/produce/in_produce/skip_turn_popup.png]
while image.wait_for(R.Common.ButtonIconCheckMark, timeout=1):
logger.info("Confirmation dialog detected")
device.click()
if return_value == 10:
logger.info("No enough AP. Skip this turn")
elif return_value == -1:
@ -317,6 +318,8 @@ def skill_card_count1():
@action('获取当前卡片数量')
def skill_card_count():
"""获取当前持有的技能卡数量"""
device.click(0, 0)
sleep(0.5)
img = device.screenshot()
img = crop_y(img, 0.83, 0.90)
count = image.raw().count(img, R.InPurodyuusu.A, threshold=0.85)
@ -409,7 +412,7 @@ def practice():
# no_card_count += 1
# if no_card_count >= MAX_NO_CARD_COUNT:
# break
if not image.find_any([
if not image.find_multi([
R.InPurodyuusu.TextPerfectUntil,
R.InPurodyuusu.TextClearUntil
]):
@ -453,42 +456,61 @@ def exam():
while ocr.wait_for(contains("メモリー"), timeout=7):
device.click_center()
@action('执行考试结束')
@action('考试结束流程')
def produce_end():
"""执行考试结束"""
# 考试结束对话 [screenshots\produce_end\step2.jpg]
image.expect_wait(R.InPurodyuusu.TextAsariProduceEnd, timeout=30)
"""执行考试结束流程"""
bottom = (int(device.screen_size[0] / 2), int(device.screen_size[1] * 0.9))
device.click(*bottom)
# 对话第二句 [screenshots\produce_end\step3.jpg]
sleep(3)
device.click_center()
sleep(6)
device.click(*bottom)
sleep(3)
device.click(*bottom)
sleep(3)
# 考试结束交流 [screenshots/produce/in_produce/final_exam_end_commu.png]
# 然后是,考试结束对话 [screenshots\produce_end\step2.jpg]
# while not image.find(R.InPurodyuusu.TextAsariProduceEnd):
# check_and_skip_commu()
# device.click(*bottom)
# sleep(1)
# # image.expect_wait(R.InPurodyuusu.TextAsariProduceEnd, timeout=30)
# sleep(0.5)
# device.click(*bottom)
# # 对话第二句 [screenshots\produce_end\step3.jpg]
# sleep(3)
# device.click_center()
# sleep(6)
# device.click(*bottom)
# sleep(3)
# device.click(*bottom)
# sleep(3)
# MV
# 等就可以了,反正又不要自己操作(
# 培育结束交流
# 结算
# 最終プロデュース評価
image.expect_wait(R.InPurodyuusu.TextFinalProduceRating, timeout=60 * 4)
device.click_center()
sleep(3)
# 最終プロデュース評価 R.InPurodyuusu.TextFinalProduceRating
# 等待选择封面画面 [screenshots/produce_end/select_cover.jpg]
# 次へ
logger.info("Waiting for select cover screen...")
wait = AdaptiveWait(timeout=60 * 5)
while not image.find(R.InPurodyuusu.ButtonNextNoIcon):
wait()
device.click(0, 0)
logger.info("Use default cover.")
sleep(3)
logger.debug("Click next")
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
sleep(1)
# 确认对话框 [screenshots/produce_end/select_cover_confirm.jpg]
# 決定
logger.debug("Click Confirm")
device.click(image.expect_wait(R.Common.ButtonConfirm, threshold=0.8))
sleep(1)
# 上传图片。注意网络可能会很慢,可能出现上传失败对话框
# 上传图片,等待“生成”按钮
# 注意网络可能会很慢,可能出现上传失败对话框
logger.info("Waiting for cover uploading...")
retry_count = 0
MAX_RETRY_COUNT = 5
while True:
img = device.screenshot()
# 处理上传失败
if image.find(R.InPurodyuusu.ButtonRetry):
if image.raw().find(img, R.InPurodyuusu.ButtonRetry):
logger.info("Upload failed. Retry...")
retry_count += 1
if retry_count >= MAX_RETRY_COUNT:
@ -499,28 +521,57 @@ def produce_end():
continue
device.click()
# 记忆封面保存失败提示
elif image.find(R.Common.ButtonClose):
elif image.raw().find(img, R.Common.ButtonClose):
logger.info("Memory cover save failed. Click to close.")
device.click()
# 结算完毕
elif image.find(R.InPurodyuusu.ButtonNextNoIcon):
logger.info("Finalize")
device.click()
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
device.click(image.expect_wait(R.InPurodyuusu.ButtonComplete))
# 关注提示
# if image.wait_for(R.InPurodyuusu.ButtonFollowProducer, timeout=2):
# device.click(image.expect_wait(R.InPurodyuusu.ButtonCancel))
elif gen_btn := ocr.raw().find(img, contains("生成")):
logger.info("Generate memory cover completed.")
device.click(gen_btn)
break
# 开始生成记忆
# elif image.find(R.InPurodyuusu.ButtonGenerateMemory):
# logger.info("Click generate memory button")
# device.click()
# 跳过结算内容
else:
device.click_center()
sleep(2)
# 后续动画
logger.info("Waiting for memory generation animation completed...")
while not image.find(R.InPurodyuusu.ButtonNextNoIcon):
device.click_center()
sleep(1)
# 结算完毕
logger.info("Finalize")
# [screenshots/produce_end/end_next_1.jpg]
logger.debug("Click next 1")
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
sleep(1.3)
# [screenshots/produce_end/end_next_2.png]
logger.debug("Click next 2")
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
sleep(1.3)
# [screenshots/produce_end/end_next_3.png]
logger.debug("Click next 3")
device.click(image.expect_wait(R.InPurodyuusu.ButtonNextNoIcon))
sleep(1.3)
# [screenshots/produce_end/end_complete.png]
logger.debug("Click complete")
device.click(image.expect_wait(R.InPurodyuusu.ButtonComplete))
sleep(1.3)
# 活动进度
# [screenshots/produce_end/end_activity.png]
# [screenshots/produce_end/end_activity1.png]
while not at_home():
if image.find(R.Common.ButtonClose):
logger.info("Activity award claim dialog found. Click to close.")
device.click()
elif image.find(R.Common.ButtonNextNoIcon):
logger.debug("Click next")
device.click(image.expect_wait(R.Common.ButtonNextNoIcon))
else:
device.click_center()
sleep(1)
logger.info("Produce completed.")
# 关注提示
# if image.wait_for(R.InPurodyuusu.ButtonFollowProducer, timeout=2):
# device.click(image.expect_wait(R.InPurodyuusu.ButtonCancel))
@action('执行 Regular 培育')
def hajime_regular(week: int = -1, start_from: int = 1):
@ -530,105 +581,6 @@ def hajime_regular(week: int = -1, start_from: int = 1):
:param week: 第几周从1开始-1表示全部
:param start_from: 从第几周开始从1开始
"""
def week1():
"""
第一周 期中考试剩余5周\n
行动Vo.レッスンDa.レッスンVi.レッスン
"""
enter_recommended_action()
loading.wait_loading_start()
logger.info("Loading...")
loading.wait_loading_end()
logger.info("Loading end")
# 支援卡判断
practice()
def week2():
"""
第二周 期中考试剩余4周\n
行动授業学习
"""
# 点击“授業”
rect = image.expect_wait(R.InPurodyuusu.Action.ActionStudy).rect
device.click(rect)
sleep(0.5)
device.click(rect)
# 等待加载
loading.wait_loading_start()
logger.info("Loading...")
# 等待加载结束
loading.wait_loading_end()
logger.info("Loading end")
# 判断是否触发支援卡剧情
# TODO:检查是否有支援卡要领取的技能卡
# 等待加载
loading.wait_loading_start()
logger.info("Loading...")
# 等待加载结束
loading.wait_loading_end()
logger.info("Loading end")
# 进入授業页面
pos = image.expect_wait(R.InPurodyuusu.Action.VocalWhiteBg).rect
device.click(pos)
sleep(0.5)
device.click(pos)
# 选择选项
# TODO: 不固定点击 Vocal
device.double_click(image.expect_wait(R.InPurodyuusu.Action.VocalWhiteBg).rect)
# 领取技能卡
acquire_skill_card()
# 三次加载画面
loading.wait_loading_start()
logger.info("Loading 1...")
loading.wait_loading_end()
logger.info("Loading 1 end")
loading.wait_loading_start()
logger.info("Loading 2...")
loading.wait_loading_end()
logger.info("Loading 2 end")
loading.wait_loading_start()
logger.info("Loading 3...")
loading.wait_loading_end()
logger.info("Loading 3 end")
def week3():
"""
第三周 期中考试剩余3周\n
行动Vo.レッスンDa.レッスンVi.レッスン授業
"""
week1()
def week4():
"""
第四周 期中考试剩余2周\n
行动おでかけ相談活動支給
"""
week3()
def week5():
"""TODO"""
def week6():
"""期中考试"""
def week7():
"""第七周 期末考试剩余6周"""
if not enter_recommended_action():
rest()
def week8():
"""
第八周 期末考试剩余5周\n
行动授業活動支給
"""
if not enter_recommended_action():
rest()
def week_lesson():
until_action_scene()
executed_action = enter_recommended_action()
@ -646,7 +598,9 @@ def hajime_regular(week: int = -1, start_from: int = 1):
def week_non_lesson():
"""非练习周。可能可用行动包括:おでかけ、相談、活動支給、授業"""
until_action_scene()
if allowance_available():
if enter_recommended_action() == 'rest':
logger.info("Recommended action is rest.")
elif allowance_available():
enter_allowance()
# elif study_available():
# enter_study()
@ -678,7 +632,8 @@ def hajime_regular(week: int = -1, start_from: int = 1):
logger.info("Exam scene detected.")
sleep(5)
device.click_center()
sleep(5)
sleep(0.5)
loading.wait_loading_end()
exam()
produce_end()
@ -697,6 +652,10 @@ def hajime_regular(week: int = -1, start_from: int = 1):
week_final_lesson, # 12: 追い込みレッスン
week_final_exam, # 13: 最終試験
]
if week not in [6, 13] and start_from not in [6, 13]:
until_action_scene()
else:
until_exam_scene()
if week != -1:
logger.info("Week %d started.", week)
weeks[week - 1]()
@ -719,19 +678,51 @@ def purodyuusu(
__actions__ = [enter_recommended_action]
if __name__ == '__main__':
from kotonebot.backend.context import init_context
from logging import getLogger
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s')
getLogger('kotonebot').setLevel(logging.DEBUG)
getLogger(__name__).setLevel(logging.DEBUG)
init_context()
while not image.wait_for_any([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
], timeout=2):
logger.info("Action scene not detected. Retry...")
acquisitions()
sleep(3)
exam()
produce_end()
# import cProfile
# p = cProfile.Profile()
# p.enable()
# acquisitions()
# p.disable()
# p.print_stats()
# p.dump_stats('profile.prof')
# until_action_scene()
# # 第一个箱子 [screenshots\allowance\step_2.png]
# logger.info("Clicking on the first lootbox.")
# device.click(image.expect_wait_any([
# R.InPurodyuusu.LootboxSliverLock
# ]))
# while acquisitions() is None:
# logger.info("Waiting for acquisitions finished.")
# sleep(2)
# # 第二个箱子
# logger.info("Clicking on the second lootbox.")
# device.click(image.expect_wait_any([
# R.InPurodyuusu.LootboxSliverLock
# ]))
# while acquisitions() is None:
# logger.info("Waiting for acquisitions finished.")
# sleep(2)
# logger.info("活動支給 completed.")
# while not image.wait_for_any([
# R.InPurodyuusu.TextPDiary, # 普通周
# R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
# ], timeout=2):
# logger.info("Action scene not detected. Retry...")
# acquisitions()
# sleep(3)
# image.wait_for_any([
# R.InPurodyuusu.TextPDiary, # 普通周

View File

@ -7,6 +7,7 @@ import numpy as np
from kotonebot import image, device, debug, action
from kotonebot.backend.debug import result
from .. import R
logger = getLogger(__name__)
@ -16,8 +17,8 @@ def loading() -> bool:
original_img = img.copy()
# 二值化图片
_, img = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY)
# 裁剪上面 10%
img = img[:int(img.shape[0] * 0.1), :]
# 裁剪上面 20%
img = img[:int(img.shape[0] * 0.2), :]
# 判断图片中颜色数量是否 <= 2
# https://stackoverflow.com/questions/56606294/count-number-of-unique-colours-in-image
b,g,r = cv2.split(img)
@ -43,9 +44,14 @@ def wait_loading_end(timeout: float = 60):
while loading():
if time.time() - start_time > timeout:
raise TimeoutError('加载超时')
# 检查网络错误
if image.find(R.Common.TextNetworkError):
device.click(image.expect(R.Common.ButtonRetry))
logger.debug('Loading...')
sleep(1)
if __name__ == '__main__':
from kotonebot.backend.context import init_context
init_context()
print(loading())
input()

View File

@ -8,6 +8,7 @@ from logging import getLogger
from kotonebot import device, image, ocr, debug, action
from kotonebot.tasks import R
from ..actions.loading import wait_loading_end, wait_loading_start
from .common import acquisitions, AcquisitionType
logger = getLogger(__name__)
@ -46,7 +47,11 @@ def enter_allowance():
logger.info("Double clicking on 活動支給.")
device.double_click(image.expect(R.InPurodyuusu.ButtonTextAllowance), interval=1)
# 等待进入页面
sleep(3)
wait_loading_end()
# 处理可能会出现的支援卡奖励
while not image.find(R.InPurodyuusu.IconTitleAllowance):
logger.debug("Waiting for 活動支給 screen.")
acquisitions()
# 第一个箱子 [screenshots\allowance\step_2.png]
logger.info("Clicking on the first lootbox.")
device.click(image.expect_wait_any([
@ -64,6 +69,9 @@ def enter_allowance():
logger.info("Waiting for acquisitions finished.")
sleep(2)
logger.info("活動支給 completed.")
# wait_loading_start() # 可能会因为加载太快,截图没截到,导致抛出异常
sleep(1)
wait_loading_end()
# 可能会出现的新动画
# 技能卡:[screenshots\allowance\step_4.png]

View File

@ -25,7 +25,7 @@ def list_pdorinku() -> list[tuple[str, Rect]]:
"""
# 截图所有饮料
# TODO: 自动记录未知饮料
dorinkus = image.find_crop_many(
dorinkus = image.find_all_crop(
R.InPurodyuusu.Action.PDorinkuBg,
mask=R.InPurodyuusu.Action.PDorinkuBgMask,
)

View File

@ -5,32 +5,10 @@ from typing import Callable
from .. import R
from .loading import loading
from kotonebot import device, image, action, cropped, UnrecoverableError
from kotonebot import device, image, action, cropped, UnrecoverableError, until
logger = logging.getLogger(__name__)
def until(
condition: Callable[[], bool],
timeout: float=10,
interval: float=0.5,
critical: bool=False
) -> bool:
"""
等待条件成立如果条件不成立则返回 False 或抛出异常
:param condition: 条件函数
:param timeout: 等待时间单位为秒
:param interval: 检查条件的时间间隔单位为秒
:param critical: 如果条件不成立是否抛出异常
"""
start = time.time()
while not condition():
if time.time() - start > timeout:
if critical:
raise TimeoutError(f"Timeout while waiting for condition {condition.__name__}.")
return False
time.sleep(interval)
return True
@action('检测是否位于首页')
def at_home() -> bool:

View File

@ -56,7 +56,7 @@ def assign(type: Literal['mini', 'online']) -> bool:
attempts = 0
while not selected:
# 寻找所有好调图标
results = image.find_many(R.Daily.IconAssignKouchou, threshold=0.7)
results = image.find_all(R.Daily.IconAssignKouchou, threshold=0.8)
results.sort(key=lambda r: tuple(r.position))
results.pop(0) # 第一个是说明文字里的图标
# 尝试点击所有目标

View File

@ -0,0 +1,75 @@
from time import sleep
import logging
from kotonebot import device, image, ocr, task, action
from . import R
from .actions.scenes import loading, at_home, goto_home
from .actions.loading import wait_loading_end
from .actions.in_purodyuusu import hajime_regular
logger = logging.getLogger(__name__)
@task('培育')
def produce():
"""进行培育流程"""
if not at_home():
goto_home()
# [screenshots/produce/home.png]
device.click(image.expect_wait(R.Produce.ButtonProduce))
sleep(0.3)
wait_loading_end()
# [screenshots/produce/regular_or_pro.png]
device.click(image.expect_wait(R.Produce.ButtonRegular))
sleep(0.3)
wait_loading_end()
# 选择 PIdol [screenshots/produce/select_p_idol.png]
device.click(image.expect_wait(R.Common.ButtonNextNoIcon))
sleep(0.1)
# 选择支援卡 自动编成 [screenshots/produce/select_support_card.png]
device.click(image.expect_wait(R.Produce.ButtonAutoSet))
sleep(0.1)
device.click(image.expect_wait(R.Common.ButtonConfirm, colored=True))
sleep(1.3)
device.click(image.expect_wait(R.Common.ButtonNextNoIcon))
# 选择回忆 自动编成 [screenshots/produce/select_memory.png]
device.click(image.expect_wait(R.Produce.ButtonAutoSet))
sleep(1.3)
device.click(image.expect_wait(R.Common.ButtonConfirm, colored=True))
sleep(0.1)
device.click(image.expect_wait(R.Common.ButtonNextNoIcon))
sleep(0.6)
# 不租赁回忆提示弹窗 [screenshots/produce/no_rent_memory_dialog.png]
with device.pinned():
if image.find(R.Produce.TextRentAvailable):
device.click(image.expect(R.Common.ButtonNextNoIcon))
sleep(0.3)
# 选择道具 [screenshots/produce/select_end.png]
# CONFIG:
device.click(image.expect_wait(R.Produce.CheckboxIconNoteBoost))
sleep(0.1)
device.click(image.expect_wait(R.Produce.CheckboxIconSupportPtBoost))
sleep(0.1)
device.click(image.expect_wait(R.Produce.ButtonProduceStart))
sleep(0.5)
while not loading():
# 跳过交流设置 [screenshots/produce/skip_commu.png]
with device.pinned():
if image.find(R.Produce.RadioTextSkipCommu):
device.click()
sleep(0.2)
if image.find(R.Common.ButtonConfirmNoIcon):
device.click()
wait_loading_end()
hajime_regular()
if __name__ == '__main__':
from kotonebot.backend.context import init_context
import logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')
logging.getLogger('kotonebot').setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
init_context()
produce()

View File

@ -19,7 +19,7 @@ def money_items():
"""
logger.info(f'Purchasing マニー items.')
# [screenshots\shop\money1.png]
results = image.find_many(R.Daily.TextShopRecommended)
results = image.find_all(R.Daily.TextShopRecommended)
# device.click(results[0])
index = 1
for index, result in enumerate(results, 1):
@ -54,7 +54,7 @@ def ap_items(item_indices: list[int]):
"""
# [screenshots\shop\ap1.png]
logger.info(f'Purchasing AP items.')
results = image.find_many(R.Daily.IconShopAp, threshold=0.7)
results = image.find_all(R.Daily.IconShopAp, threshold=0.7)
sleep(1)
# 按 X, Y 坐标排序从小到大
results = sorted(results, key=lambda x: (x.position[0], x.position[1]))

View File

@ -1,13 +0,0 @@
from kotonebot.backend.util import TaskInfo
from kotonebot.backend.context import device
def purodyuusu(
):
pass
__task__ = TaskInfo(
name="purodyuusu",
description="进行一次培养",
entry=purodyuusu
)

View File

@ -7,6 +7,7 @@ from . import R
from .common import Priority
from .actions.loading import loading
from .actions.scenes import at_home, goto_home
from .actions.commu import is_at_commu, check_and_skip_commu
logger = logging.getLogger(__name__)
@task('启动游戏', priority=Priority.START_GAME)
@ -41,6 +42,9 @@ def start_game():
# [screenshots/startup/announcement1.png]
elif image.find(R.Common.ButtonIconClose):
device.click()
# [screenshots/startup/birthday.png]
elif check_and_skip_commu():
pass
else:
device.click_center()
wait()

View File

@ -1,4 +1,10 @@
"""消息框、通知、推送等 UI 相关函数"""
import logging
from typing import Callable
from cv2.typing import MatLike
logger = logging.getLogger(__name__)
def ask(
question: str,
@ -11,8 +17,21 @@ def ask(
"""
raise NotImplementedError
def info(
message: str,
images: list[MatLike],
*,
once: bool = False
):
"""
信息
"""
logger.debug('user.info: %s', message)
def warning(
message: str,
images: list[MatLike],
*,
once: bool = False
):
"""
@ -21,4 +40,4 @@ def warning(
:param message: 消息内容
:param once: 每次运行是否只显示一次
"""
pass
logger.warning('user.warning: %s', message)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 221 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 188 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 749 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 830 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 924 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 497 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 662 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 471 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 675 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 501 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 790 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 956 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 890 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 996 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 955 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 391 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 194 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 200 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 417 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 546 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 828 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 839 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 360 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 699 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 585 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 859 KiB

View File

@ -86,3 +86,59 @@ class TestColor(BaseTestCase):
img = _img_rgb_to_bgr(img)
self.assertEqual(find_rgb(img, target_color, threshold=threshold), (x, y), f'color={color}')
def test_hsv_web2cv(self):
test_cases = [
# (h, s, v) Web format -> (h, s, v) OpenCV format
((0, 0, 0), (0, 0, 0)),
((360, 100, 100), (180, 255, 255)),
((180, 50, 50), (90, 128, 128)),
((240, 75, 80), (120, 191, 204)),
]
from kotonebot.backend.color import hsv_web2cv
for input_hsv, expected in test_cases:
self.assertEqual(hsv_web2cv(*input_hsv), expected, f'input={input_hsv}')
def test_hsv_cv2web(self):
test_cases = [
# (h, s, v) OpenCV -> (h, s, v) Web format
((0, 0, 0), (0, 0, 0)),
((180, 255, 255), (360, 100, 100)),
((90, 128, 128), (180, 50, 50)),
((120, 191, 204), (240, 75, 80)),
]
from kotonebot.backend.color import hsv_cv2web
for input_hsv, expected in test_cases:
self.assertEqual(hsv_cv2web(*input_hsv), expected, f'input={input_hsv}')
def test_rgb_to_hsv(self):
test_cases = [
# RGB -> HSV (web format)
('#000000', (0, 0, 0)),
('#FFFFFF', (0, 0, 100)),
('#FF0000', (0, 100, 100)),
('#00FF00', (120, 100, 100)),
('#0000FF', (240, 100, 100)),
# RGB tuple -> HSV
((255, 0, 0), (0, 100, 100)),
((0, 255, 0), (120, 100, 100)),
((0, 0, 255), (240, 100, 100)),
((128, 128, 128), (0, 0, 50)),
]
from kotonebot.backend.color import rgb_to_hsv
for input_rgb, expected in test_cases:
self.assertEqual(rgb_to_hsv(input_rgb), expected, f'input={input_rgb}')
def test_hsv_to_rgb(self):
test_cases = [
# HSV (Web format) -> RGB
((0, 0, 0), (0, 0, 0)),
((0, 100, 100), (255, 0, 0)),
((120, 100, 100), (0, 255, 0)),
((240, 100, 100), (0, 0, 255)),
((60, 100, 100), (255, 255, 0)),
((180, 100, 100), (0, 255, 255)),
]
from kotonebot.backend.color import hsv_to_rgb
for input_hsv, expected in test_cases:
self.assertEqual(hsv_to_rgb(input_hsv), expected, f'input={input_hsv}')

View File

@ -0,0 +1 @@

View File

@ -2,7 +2,7 @@ import unittest
import cv2
from kotonebot.backend.image import template_match, find_crop
from kotonebot.backend.image import template_match, find_all_crop
def save(image, name: str):
import os
@ -58,7 +58,7 @@ class TestTemplateMatch(unittest.TestCase):
self.__assert_pos(result[2], 444, 829)
def test_crop(self):
result = find_crop(
result = find_all_crop(
self.image,
self.template,
self.mask,

68
tools/copy_img.py Normal file
View File

@ -0,0 +1,68 @@
import json
import os
import shutil
import argparse
from pathlib import Path
def process_file(input_file: str, output_dir: str):
"""处理输入文件并复制图片到输出目录"""
input_path = Path(input_file)
if not input_path.exists():
print(f"错误:输入文件 {input_file} 不存在")
return
# 确保输出目录存在
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# 获取输入文件所在目录
input_dir = input_path.parent
image_ids = []
total_lines = 0
print(f"开始处理文件:{input_file}")
# 读取并处理每一行
with open(input_file, 'r', encoding='utf-8') as f:
for line in f:
total_lines += 1
try:
data = json.loads(line.strip())
if 'image' in data and 'value' in data['image']:
value_array = data['image']['value']
if value_array and len(value_array) > 0:
image_ids.append(value_array[-1])
except json.JSONDecodeError as e:
print(f"警告:第 {total_lines} 行解析失败:{e}")
print(f"文件总行数:{total_lines}")
print(f"找到的图片ID数量{len(image_ids)}")
# 复制图片
copied_count = 0
for img_id in image_ids:
source_file = input_dir / f"{img_id}.png"
target_file = output_path / f"{img_id}.png"
if source_file.exists():
try:
shutil.copy2(source_file, target_file)
copied_count += 1
except Exception as e:
print(f"警告:复制图片 {img_id}.png 失败:{e}")
else:
print(f"警告:源图片不存在:{source_file}")
print(f"成功复制的图片数量:{copied_count}")
def main():
parser = argparse.ArgumentParser(description='从 Dump JSON 文件中提取所有原图并复制相应的 PNG 文件到目标目录中')
parser.add_argument('input', help='输入 Dump JSON 文件的路径')
parser.add_argument('-o', '--out-dir', required=True, help='图片输出目录')
args = parser.parse_args()
process_file(args.input, args.out_dir)
if __name__ == '__main__':
main()