feat(core): 用于展示检测结果的可视化调试页面

This commit is contained in:
XcantloadX 2025-01-09 20:58:37 +08:00
parent c8736e3ab5
commit f0a3dadd71
19 changed files with 1131 additions and 37 deletions

8
.vscode/launch.json vendored
View File

@ -19,6 +19,14 @@
"request": "launch",
"console": "integratedTerminal",
"module": "${command:extension.commandvariable.file.relativeDirDots}.${fileBasenameNoExtension}",
},
{
"name": "KotonebotDebug: Current Module",
"type": "debugpy",
"request": "launch",
"console": "integratedTerminal",
"module": "kotonebot.backend.debug.entry",
"args": ["${command:extension.commandvariable.file.relativeDirDots}.${fileBasenameNoExtension}"]
}
]
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 935 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.2 KiB

View File

@ -1,20 +1,65 @@
import cv2
from kotonebot.client.device.fast_screenshot import AdbFastScreenshots
import dearpygui.dearpygui as dpg
import cv2 as cv
import numpy as np
with AdbFastScreenshots(
adb_path=r"D:\SDK\Android\platform-tools\adb.exe",
device_serial="127.0.0.1:16384",
time_interval=179,
width=720,
height=1280,
bitrate="5M",
use_busybox=False,
connect_to_device=True,
screenshotbuffer=10,
go_idle=0,
) as adbscreen:
for image in adbscreen:
cv2.imshow("CV2 WINDOW", image)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cv2.destroyAllWindows()
dpg.create_context()
dpg.create_viewport(title='Custom Title', width=600, height=800)
dpg.setup_dearpygui()
vid = cv.VideoCapture(0)
ret, frame = vid.read()
# image size or you can get this from image shape
frame_width = vid.get(cv.CAP_PROP_FRAME_WIDTH)
frame_height = vid.get(cv.CAP_PROP_FRAME_HEIGHT)
video_fps = vid.get(cv.CAP_PROP_FPS)
print(frame_width)
print(frame_height)
print(video_fps)
print("Frame Array:")
print("Array is of type: ", type(frame))
print("No. of dimensions: ", frame.ndim)
print("Shape of array: ", frame.shape)
print("Size of array: ", frame.size)
print("Array stores elements of type: ", frame.dtype)
data = np.flip(frame, 2) # because the camera data comes in as BGR and we need RGB
data = data.ravel() # flatten camera data to a 1 d stricture
data = np.asarray(data, dtype=np.float32) # change data type to 32bit floats
texture_data = np.true_divide(data, 255.0) # normalize image data to prepare for GPU
print("texture_data Array:")
print("Array is of type: ", type(texture_data))
print("No. of dimensions: ", texture_data.ndim)
print("Shape of array: ", texture_data.shape)
print("Size of array: ", texture_data.size)
print("Array stores elements of type: ", texture_data.dtype)
with dpg.texture_registry(show=True):
dpg.add_raw_texture(frame.shape[1], frame.shape[0], texture_data, tag="texture_tag", format=dpg.mvFormat_Float_rgb)
with dpg.window(label="Example Window"):
dpg.add_text("Hello, world")
dpg.add_image("texture_tag")
dpg.show_metrics()
dpg.show_viewport()
while dpg.is_dearpygui_running():
# updating the texture in a while loop the frame rate will be limited to the camera frame rate.
# commenting out the "ret, frame = vid.read()" line will show the full speed that operations and updating a texture can run at
ret, frame = vid.read()
data = np.flip(frame, 2)
data = data.ravel()
data = np.asarray(data, dtype=np.float32)
texture_data = np.true_divide(data, 255.0)
dpg.set_value("texture_tag", texture_data)
# to compare to the base example in the open cv tutorials uncomment below
#cv.imshow('frame', frame)
dpg.render_dearpygui_frame()
vid.release()
#cv.destroyAllWindows() # when using upen cv window "imshow" call this also
dpg.destroy_context()

View File

@ -0,0 +1 @@
from .vars import result, debug, img

View File

@ -0,0 +1,28 @@
import sys
import runpy
from threading import Thread
from typing import Callable
from . import debug
def _task_thread(task_module: str):
"""任务线程。"""
runpy.run_module(task_module, run_name="__main__")
def _start_task_thread():
"""启动任务线程。"""
module = sys.argv[1]
thread = Thread(target=_task_thread, args=(module,))
thread.start()
if __name__ == "__main__":
debug.enabled = True
# 启动服务器
from .server import app
import uvicorn
# 启动任务线程
_start_task_thread()
# 启动服务器
uvicorn.run(app, host="127.0.0.1", port=8000)

View File

@ -0,0 +1,151 @@
from pathlib import Path
from collections import deque
import asyncio
import cv2
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.responses import FileResponse, Response
from fastapi.staticfiles import StaticFiles
from . import vars
app = FastAPI()
# 获取当前文件夹路径
CURRENT_DIR = Path(__file__).parent
APP_DIR = Path.cwd()
# 挂载静态文件
app.mount("/static", StaticFiles(directory=str(CURRENT_DIR)), name="static")
@app.get("/")
async def get_root():
"""返回 UI 页面"""
return FileResponse(CURRENT_DIR / "ui.html")
@app.get("/api/read_file")
async def read_file(path: str):
"""读取文件内容"""
try:
# 确保路径在当前目录下
full_path = (APP_DIR / path).resolve()
if not Path(full_path).is_relative_to(APP_DIR):
raise HTTPException(status_code=403, detail="Access denied")
if not full_path.exists():
raise HTTPException(status_code=404, detail="File not found")
# 添加缓存控制头
headers = {
"Cache-Control": "public, max-age=3600", # 缓存1小时
"ETag": f'"{hash(full_path)}"' # 使用full_path的哈希值作为ETag
}
return FileResponse(full_path, headers=headers)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/read_memory")
async def read_memory(key: str):
"""读取内存中的数据"""
print('read_memory', key)
try:
image = None
if key in vars._results:
image = vars._results[key].image
elif key in vars._images:
image = vars._images[key]
else:
raise HTTPException(status_code=404, detail="Key not found")
# 编码图片
encode_params = [cv2.IMWRITE_JPEG_QUALITY, 85, cv2.IMWRITE_JPEG_PROGRESSIVE, 1]
_, buffer = cv2.imencode('.jpg', image, encode_params)
# 添加缓存控制头
headers = {
"Cache-Control": "public, max-age=3600", # 缓存1小时
"ETag": f'"{hash(key)}"' # 使用key的哈希值作为ETag
}
return Response(
buffer.tobytes(),
media_type="image/jpeg",
headers=headers
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/ping")
async def ping():
return {"status": "ok"}
message_queue = deque()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
if len(message_queue) > 0:
message = message_queue.pop()
# print('send message', message)
await websocket.send_json(message)
await asyncio.sleep(0.1)
# data = await websocket.receive_text()
# try:
# message = json.loads(data)
# if message["type"] == "visual":
# # 处理可视化消息
# image_data = message["data"]["image"]
# if image_data["type"] == "path":
# # 从文件读取
# full_path = (CURRENT_DIR / image_data["value"]).resolve()
# if not str(full_path).startswith(str(CURRENT_DIR)):
# await websocket.send_json({"error": "Access denied"})
# continue
# if not full_path.exists():
# await websocket.send_json({"error": "File not found"})
# continue
# with open(full_path, 'rb') as f:
# content = base64.b64encode(f.read()).decode()
# else: # memory
# key = image_data["value"]
# if key not in vars._results:
# await websocket.send_json({"error": "Key not found"})
# continue
# result = vars._results[key]
# _, buffer = cv2.imencode('.png', result.image)
# content = base64.b64encode(buffer).decode()
# # 发送响应
# await websocket.send_json({
# "type": "visual",
# "data": {
# "image": content,
# "name": message["data"]["name"],
# "details": message["data"]["details"]
# }
# })
# except json.JSONDecodeError:
# await websocket.send_json({"error": "Invalid JSON"})
# except Exception as e:
# await websocket.send_json({"error": str(e)})
except:
await websocket.close()
# 修改 vars.py 中的 result 函数
def send_ws_message(title: str, image: str, text: str = ''):
"""发送 WebSocket 消息"""
# 这个函数将在 vars.py 中被调用
message = {
"type": "visual",
"data": {
"image": {
"type": "memory",
"value": image
},
"name": title,
"details": text
}
}
# 发送消息到所有连接的客户端
message_queue.append(message)

View File

@ -0,0 +1,597 @@
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>图片查看器</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
body, html {
height: 100%;
margin: 0;
padding: 0;
}
.container-fluid {
height: 100%;
padding: 0;
}
.main-row {
height: 100%;
margin: 0;
}
.image-viewer-col {
height: 100%;
padding: 0;
transition: width 0.3s ease;
}
.image-viewer {
height: 100%;
border: 1px solid #ddd;
padding: 20px;
display: flex;
flex-direction: column;
}
.image-container {
flex: 1;
border: 1px solid #eee;
margin-bottom: 20px;
display: flex;
align-items: center;
justify-content: center;
overflow: hidden;
position: relative;
}
.image-container img {
max-width: none;
max-height: none;
transform-origin: center center;
cursor: grab;
user-select: none;
position: relative;
}
.image-container img.dragging {
cursor: grabbing;
}
.info-panel-col {
height: 100%;
padding: 0;
transition: none;
position: relative;
}
/* 拖拽条样式 */
.resize-handle {
position: absolute;
left: 0;
top: 0;
bottom: 0;
width: 4px;
background-color: transparent;
cursor: col-resize;
z-index: 1001;
transition: background-color 0.2s;
}
.resize-handle:hover,
.resize-handle.dragging {
background-color: #0d6efd;
}
.method-panel {
height: 100%;
border: 1px solid #ddd;
border-left: none;
padding: 15px;
position: relative;
display: flex;
flex-direction: column;
}
.toggle-panel-btn {
position: absolute;
right: 15px;
top: 15px;
z-index: 1000;
}
.navigation-buttons {
text-align: center;
margin-top: 20px;
}
.navigation-buttons .btn {
margin: 0 5px;
min-width: 40px;
}
.navigation-buttons .btn:first-child {
margin-right: 5px;
}
.expand-panel-btn {
position: fixed;
right: 15px;
top: 15px;
z-index: 1000;
display: none; /* 初始状态隐藏 */
}
/* 可选:鼠标悬停时稍微放大按钮 */
.expand-panel-btn:hover {
transform: scale(1.1);
transition: transform 0.2s;
}
.zoom-controls {
text-align: center;
margin: 10px 0;
}
.zoom-controls .btn {
margin: 0 5px;
}
.zoom-level {
display: inline-block;
margin: 0 10px;
min-width: 60px;
}
.connection-status {
position: fixed;
top: 0;
left: 0;
right: 0;
background-color: #dc3545;
color: white;
text-align: center;
padding: 8px;
font-weight: bold;
z-index: 2000;
display: none;
opacity: 0;
transition: opacity 0.3s ease;
}
.connection-status.show {
display: block;
opacity: 1;
}
/* 下载按钮样式 */
.download-btn {
opacity: 0.7;
transition: opacity 0.2s;
}
.download-btn:hover {
opacity: 1;
}
#customMessage {
flex: 1;
overflow-y: auto;
margin-top: 10px;
padding: 10px;
white-space: pre-wrap;
word-wrap: break-word;
}
/* 结果表格。用在 Python 代码里 */
.result-table {
text-align: center;
width: 100%;
border-collapse: collapse;
border: 1px solid #ddd;
}
.result-table td {
border: 1px solid #ddd;
padding: 5px;
}
</style>
</head>
<body>
<!-- 添加连接状态提示 -->
<div class="connection-status" id="connectionStatus">
WebSocket 连接已断开,正在尝试重新连接...
</div>
<!-- 添加展开按钮 -->
<button class="btn btn-sm btn-secondary expand-panel-btn" id="expandPanel">
<i class="bi bi-layout-sidebar"></i>
</button>
<div class="container-fluid">
<div class="row main-row">
<div class="col image-viewer-col" id="imageViewerCol">
<div class="image-viewer">
<div class="image-container" id="imageViewer">
<!-- 图片将在这里显示 -->
</div>
<div class="zoom-controls">
<button class="btn btn-sm btn-outline-secondary" id="zoomOutBtn" alt="缩小">
<i class="bi bi-zoom-out"></i>
</button>
<span class="zoom-level" id="zoomLevel">100%</span>
<button class="btn btn-sm btn-outline-secondary" id="zoomInBtn" alt="放大">
<i class="bi bi-zoom-in"></i>
</button>
<button class="btn btn-sm btn-outline-secondary" id="resetZoomBtn" alt="重置缩放">
<i class="bi bi-arrow-counterclockwise"></i>
</button>
<button class="btn btn-sm btn-outline-secondary" id="downloadBtn" alt="下载图片">
<i class="bi bi-download"></i>
</button>
</div>
<div class="navigation-buttons">
<button class="btn btn-sm btn-outline-secondary" id="firstBtn" alt="第一张">
<i class="bi bi-chevron-bar-left"></i>
</button>
<button class="btn btn-sm btn-outline-secondary" id="prevBtn" alt="上一张">
<i class="bi bi-chevron-left"></i>
</button>
<span id="imageCount" style="margin: 0 10px">0/0</span>
<button class="btn btn-sm btn-outline-secondary" id="nextBtn" alt="下一张">
<i class="bi bi-chevron-right"></i>
</button>
<button class="btn btn-sm btn-outline-secondary" id="lastBtn" alt="最后一张">
<i class="bi bi-chevron-bar-right"></i>
</button>
</div>
</div>
</div>
<!-- 信息面板 -->
<div class="col-md-4 info-panel-col" id="infoPanelCol">
<div class="resize-handle" id="resizeHandle"></div>
<div class="method-panel">
<button class="btn btn-sm btn-secondary toggle-panel-btn" id="togglePanel">
<i class="bi bi-x-lg"></i>
</button>
<h4 style="margin-top: 40px">&lt;method name&gt;</h4>
<div id="customMessage" style="white-space: pre-wrap;">
&lt;Custom message&gt;
</div>
</div>
</div>
</div>
</div>
<!-- 添加 Bootstrap Icons -->
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.7.2/font/bootstrap-icons.css">
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
<script>
$(document).ready(function() {
let ws = null;
let currentScale = 1.0;
const ZOOM_STEP = 0.1;
const MIN_SCALE = 0.1;
const MAX_SCALE = 5.0;
let isDragging = false;
let startX, startY;
let translateX = 0, translateY = 0;
let isReconnecting = false;
let imageHistory = [];
let currentImageIndex = -1;
let isResizing = false;
let startWidth = 0;
let minWidth = 200; // 最小宽度
let maxWidth = 800; // 最大宽度
// 显示/隐藏连接状态
function showConnectionStatus(show) {
const status = $('#connectionStatus');
if (show) {
status.addClass('show');
} else {
status.removeClass('show');
}
}
// 检查服务器状态
async function checkServerStatus() {
try {
const response = await fetch('/api/ping');
if (response.ok) {
// 服务器恢复了,刷新页面
window.location.reload();
}
} catch (e) {
// 服务器仍然不可用,继续尝试
setTimeout(checkServerStatus, 2000);
}
}
// 更新图片计数
function updateImageCount() {
$('#imageCount').text(`${currentImageIndex + 1}/${imageHistory.length}`);
$('#firstBtn').prop('disabled', currentImageIndex <= 0);
$('#prevBtn').prop('disabled', currentImageIndex <= 0);
$('#nextBtn').prop('disabled', currentImageIndex >= imageHistory.length - 1);
$('#lastBtn').prop('disabled', currentImageIndex >= imageHistory.length - 1);
}
// 显示历史图片
function showHistoryImage(index) {
if (index < 0 || index >= imageHistory.length) return;
currentImageIndex = index;
const imageData = imageHistory[index];
// 更新图片
const imageContainer = $('#imageViewer');
const img = $('<img>');
if (imageData.type === 'memory') {
img.attr('src', `/api/read_memory?key=${imageData.value}`);
} else if (imageData.type === 'file') {
img.attr('src', `/api/read_file?path=${imageData.value}`);
}
imageContainer.empty().append(img);
// 更新标题和详情
$('h4').text(imageData.name);
$('#customMessage').html(imageData.details);
// 应用当前的缩放和位移
updateImageTransform(img);
updateZoomLevel();
// 初始化拖拽事件
initDragEvents();
// 更新计数
updateImageCount();
}
// 连接 WebSocket
function connectWebSocket() {
if (isReconnecting) return;
isReconnecting = true;
showConnectionStatus(true);
ws = new WebSocket(`ws://${window.location.host}/ws`);
ws.onopen = function() {
console.log('WebSocket connected');
isReconnecting = false;
showConnectionStatus(false);
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log(data);
if (data.error) {
console.error('WebSocket error:', data.error);
return;
}
if (data.type === 'visual') {
// 保存图片数据到历史记录
imageHistory.push({
type: data.data.image.type,
value: data.data.image.value,
name: data.data.name,
details: data.data.details
});
// 只有当前正在查看最后一张图片时,才自动显示新图片
if (currentImageIndex === imageHistory.length - 2 || currentImageIndex === -1) {
currentImageIndex = imageHistory.length - 1;
showHistoryImage(currentImageIndex);
} else {
// 否则只更新计数和按钮状态
updateImageCount();
}
}
};
ws.onclose = function() {
console.log('WebSocket disconnected');
showConnectionStatus(true);
// 开始检查服务器状态
checkServerStatus();
};
ws.onerror = function(error) {
console.error('WebSocket error:', error);
showConnectionStatus(true);
};
}
// 更新缩放级别显示
function updateZoomLevel() {
$('#zoomLevel').text(Math.round(currentScale * 100) + '%');
}
// 更新图片变换
function updateImageTransform(img) {
img.css('transform', `translate(${translateX}px, ${translateY}px) scale(${currentScale})`);
}
// 设置图片缩放
function setImageScale(scale) {
currentScale = Math.min(Math.max(scale, MIN_SCALE), MAX_SCALE);
const img = $('#imageViewer img');
updateImageTransform(img);
updateZoomLevel();
}
// 初始化拖拽事件
function initDragEvents() {
const container = $('#imageViewer');
const img = container.find('img');
img.on('mousedown', function(e) {
isDragging = true;
startX = e.clientX - translateX;
startY = e.clientY - translateY;
img.addClass('dragging');
});
$(document).on('mousemove', function(e) {
if (!isDragging) return;
translateX = e.clientX - startX;
translateY = e.clientY - startY;
updateImageTransform(img);
});
$(document).on('mouseup', function() {
if (!isDragging) return;
isDragging = false;
img.removeClass('dragging');
});
// 防止拖拽时选中文本
container.on('dragstart', function(e) {
e.preventDefault();
});
}
// 缩放按钮事件
$('#zoomInBtn').click(() => setImageScale(currentScale + ZOOM_STEP));
$('#zoomOutBtn').click(() => setImageScale(currentScale - ZOOM_STEP));
$('#resetZoomBtn').click(() => setImageScale(1.0));
// 鼠标滚轮缩放
$('#imageViewer').on('wheel', function(e) {
e.preventDefault();
const delta = e.originalEvent.deltaY;
setImageScale(currentScale + (delta > 0 ? -ZOOM_STEP : ZOOM_STEP));
});
// 面板切换
$('#togglePanel').click(function() {
const infoPanel = $('#infoPanelCol');
const imageViewer = $('#imageViewerCol');
const expandBtn = $('#expandPanel');
if (infoPanel.is(':visible')) {
infoPanel.hide();
imageViewer.removeClass('col').addClass('col-12');
expandBtn.show();
}
});
$('#expandPanel').click(function() {
const infoPanel = $('#infoPanelCol');
const imageViewer = $('#imageViewerCol');
$(this).hide();
imageViewer.removeClass('col-12').addClass('col');
infoPanel.show();
});
// 导航按钮事件
$('#firstBtn').click(() => showHistoryImage(0));
$('#prevBtn').click(() => showHistoryImage(currentImageIndex - 1));
$('#nextBtn').click(() => showHistoryImage(currentImageIndex + 1));
$('#lastBtn').click(() => showHistoryImage(imageHistory.length - 1));
// 初始化拖拽调整宽度
function initResizeHandle() {
const handle = $('#resizeHandle');
const panel = $('#infoPanelCol');
handle.on('mousedown', function(e) {
isResizing = true;
startWidth = panel.width();
startX = e.clientX;
handle.addClass('dragging');
e.preventDefault();
});
$(document).on('mousemove', function(e) {
if (!isResizing) return;
const delta = startX - e.clientX;
let newWidth = startWidth + delta;
// 限制最小和最大宽度
newWidth = Math.max(minWidth, Math.min(maxWidth, newWidth));
// 更新面板宽度
panel.css('width', newWidth + 'px');
panel.css('flex', 'none'); // 防止 flex 布局影响宽度
});
$(document).on('mouseup', function() {
if (!isResizing) return;
isResizing = false;
handle.removeClass('dragging');
});
}
// 下载当前图片
function downloadCurrentImage() {
if (currentImageIndex < 0 || currentImageIndex >= imageHistory.length) return;
const imageData = imageHistory[currentImageIndex];
const img = $('#imageViewer img');
const link = document.createElement('a');
// 使用原始图片URL
link.href = img.attr('src');
// 生成文件名
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const extension = imageData.type === 'memory' ? 'jpg' : 'png';
link.download = `${imageData.name}_${timestamp}.${extension}`;
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
}
// 键盘控制
$(document).keydown(function(e) {
// 如果正在输入,不处理键盘事件
if (e.target.tagName === 'INPUT' || e.target.tagName === 'TEXTAREA') return;
switch(e.key) {
case 'ArrowLeft':
if (!$('#prevBtn').prop('disabled')) {
showHistoryImage(currentImageIndex - 1);
}
break;
case 'ArrowRight':
if (!$('#nextBtn').prop('disabled')) {
showHistoryImage(currentImageIndex + 1);
}
break;
case 'Home':
if (!$('#firstBtn').prop('disabled')) {
showHistoryImage(0);
}
break;
case 'End':
if (!$('#lastBtn').prop('disabled')) {
showHistoryImage(imageHistory.length - 1);
}
break;
}
});
// 下载按钮事件
$('#downloadBtn').click(downloadCurrentImage);
// 初始化所有功能
connectWebSocket();
initResizeHandle();
});
</script>
<script>
// 通信代码
</script>
</body>
</html>

View File

@ -0,0 +1,106 @@
from pathlib import Path
import re
import time
import uuid
import traceback
from datetime import datetime
from typing import Callable, NamedTuple
import cv2
import numpy as np
from cv2.typing import MatLike
class Result(NamedTuple):
title: str
image: MatLike
text: str
class _Vars:
def __init__(self):
self.__enabled: bool = False
self.__max_results: int = 200
@property
def enabled(self) -> bool:
"""是否启用调试结果显示。"""
return self.__enabled
@enabled.setter
def enabled(self, value: bool):
self.__enabled = value
@property
def max_results(self) -> int:
"""最多保存的结果数量。"""
return self.__max_results
@max_results.setter
def max_results(self, value: int):
self.__max_results = value
debug = _Vars()
# TODO: 需要考虑释放内存的问题。释放哪些比较合适?
_results: dict[str, Result] = {}
_images: dict[str, MatLike] = {}
"""存放临时图片的字典。"""
def img(image: str | MatLike | None) -> str:
"""
用于在 `result()` 函数中嵌入图片
:param image: 图片路径或 OpenCV 图片对象
:return: 图片的 HTML 代码
"""
if image is None:
return 'None'
elif isinstance(image, str):
return f'<img src="/api/read_file?path={image}" />'
else:
key = str(uuid.uuid4())
_images[key] = image
return f'<img src="/api/read_memory?key={key}" />'
# TODO: 保存原图。原图用 PNG结果用 JPG 压缩。
def result(
title: str,
image: MatLike,
text: str = ''
):
"""
显示图片结果
```python
result(
"image.find",
image,
f"template: {img(template)} \\n"
f"matches: {len(matches)} \\n"
)
```
:param title: 标题建议使用 `模块.方法` 格式
:param image: 图片
:param text: 详细文本可以是 HTML 代码空格和换行将会保留如果需要嵌入图片使用 `img()` 函数
"""
if not debug.enabled:
return
key = 'result_' + title + '_' + str(time.time())
_results[key] = Result(title, image, text)
if len(_results) > debug.max_results:
_results.pop(next(iter(_results)))
# 拼接消息
now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-4]
callstack = [frame.replace(str(Path.cwd()), '.') for frame in traceback.format_stack()
if not re.search(r'Python\d*[\/\\]lib|debugpy', frame)]
final_text = (
f"Time: {now_time}\n" +
f"\n{text}\n" +
f"<details><summary>Callstack</summary>{''.join(callstack)}</details>"
)
# 发送 WS 消息
from .server import send_ws_message
send_ws_message(title, key, final_text)

View File

@ -2,6 +2,8 @@ import os
from typing import NamedTuple, Protocol, TypeVar
from logging import getLogger
from .debug import result, debug, img
import cv2
import numpy as np
from cv2.typing import MatLike, Rect, Point, Size
@ -20,6 +22,7 @@ class ResultProtocol(Protocol):
score: float
position: Point
class TemplateMatchResult(NamedTuple):
score: float
position: Point
@ -66,6 +69,16 @@ def _remove_duplicate_matches(
result.append(match)
return result
def _draw_result(image: MatLike, matches: list[TemplateMatchResult] | TemplateMatchResult | None) -> MatLike:
if matches is None:
return image
if isinstance(matches, TemplateMatchResult):
matches = [matches]
result_image = image.copy()
for match in matches:
cv2.rectangle(result_image, match.rect, (0, 0, 255), 2)
return result_image
def template_match(
template: MatLike | str,
image: MatLike | str,
@ -152,9 +165,19 @@ def find(
mask: MatLike | str | None = None,
transparent: bool = False,
threshold: float = 0.8,
*,
debug_output: bool = True,
) -> TemplateMatchResult | None:
"""寻找一个模板图像"""
matches = template_match(template, image, mask, transparent, threshold, max_results=-1)
# 调试输出
if debug.enabled and debug_output:
result_image = _draw_result(image, matches)
result_text = f"template: {img(template)} \n"
result_text += f"matches: {len(matches)} \n"
for match in matches:
result_text += f"score: {match.score} position: {match.position} size: {match.size} \n"
result(f"image.find", result_image, result_text)
return matches[0] if len(matches) > 0 else None
def find_any(
@ -165,15 +188,32 @@ def find_any(
threshold: float = 0.8,
) -> TemplateMatchResult | None:
"""指定多个模板,返回第一个匹配到的结果"""
ret = None
if masks is None:
_masks = [None] * len(templates)
else:
_masks = masks
for template, mask in zip(templates, _masks):
ret = find(image, template, mask, transparent, threshold)
ret = find(image, template, mask, transparent, threshold, debug_output=False)
# 调试输出
if ret is not None:
return ret
return None
break
if debug.enabled:
msg = (
"<table class='result-table'>" +
"<tr><th>Template</th><th>Mask</th><th>Result</th></tr>" +
"\n".join([
f"<tr><td>{img(t)}</td><td>{img(m)}</td><td>{'' if ret and t == templates[0] else ''}</td></tr>"
for t, m in zip(templates, _masks)
]) +
"</table>\n"
)
result(
'image.find_any',
_draw_result(image, ret),
msg
)
return ret
def count(
image: MatLike,
@ -186,11 +226,19 @@ def count(
results = template_match(template, image, mask, transparent, threshold, max_results=-1)
if remove_duplicate:
results = _remove_duplicate_matches(results)
# 画出结果
# for result in results:
# cv2.rectangle(image, result.rect, (0, 0, 255), 2)
# cv2.imshow('count', image)
# cv2.waitKey(0)
if debug.enabled:
result_image = _draw_result(image, results)
result(
'image.count',
result_image,
(
f"template: {img(template)} \n"
f"mask: {img(mask)} \n"
f"transparent: {transparent} \n"
f"threshold: {threshold} \n"
f"count: {len(results)} \n"
)
)
return len(results)
def expect(
@ -201,7 +249,21 @@ def expect(
threshold: float = 0.9,
) -> TemplateMatchResult:
ret = find(image, template, mask, transparent, threshold)
if debug.enabled:
result(
'image.expect',
_draw_result(image, ret),
(
f"template: {img(template)} \n"
f"mask: {img(mask)} \n"
f"args: transparent={transparent} threshold={threshold} \n"
f"result: {ret} "
'<span class="text-success">SUCCESS</span>' if ret is not None
else '<span class="text-danger">FAILED</span>'
)
)
if ret is None:
raise TemplateNotFoundError(image, template)
return ret
else:
return ret

View File

@ -4,10 +4,11 @@ import unicodedata
from os import PathLike
from typing import TYPE_CHECKING, Callable, NamedTuple, overload
from kotonebot.backend.util import Rect
from .util import Rect
from .debug import result as debug_result, debug
if TYPE_CHECKING:
from cv2.typing import MatLike
import cv2
from cv2.typing import MatLike
from rapidocr_onnxruntime import RapidOCR
_engine_jp = RapidOCR(
@ -48,6 +49,66 @@ def _is_match(text: str, pattern: re.Pattern | str | StringMatchFunction) -> boo
else:
return text == pattern
def _draw_result(image: 'MatLike', result: list[OcrResult]) -> 'MatLike':
import numpy as np
from PIL import Image, ImageDraw, ImageFont
# 转换为PIL图像
result_image = cv2.cvtColor(image.copy(), cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(result_image)
draw = ImageDraw.Draw(pil_image, 'RGBA')
# 加载字体
try:
font = ImageFont.truetype(r'res\fonts\SourceHanSansHW-Regular.otf', 16)
except:
font = ImageFont.load_default()
for r in result:
# 画矩形框
draw.rectangle(
[r.rect[0], r.rect[1], r.rect[0] + r.rect[2], r.rect[1] + r.rect[3]],
outline=(255, 0, 0),
width=2
)
# 获取文本大小
text = r.text + f" ({r.confidence:.2f})" # 添加置信度显示
text_bbox = draw.textbbox((0, 0), text, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
# 计算文本位置
text_x = r.rect[0]
text_y = r.rect[1] - text_height - 5 if r.rect[1] > text_height + 5 else r.rect[1] + r.rect[3] + 5
# 添加padding
padding = 4
bg_rect = [
text_x - padding,
text_y - padding,
text_x + text_width + padding,
text_y + text_height + padding
]
# 画半透明背景
draw.rectangle(
bg_rect,
fill=(0, 0, 0, 128)
)
# 画文字
draw.text(
(text_x, text_y),
text,
font=font,
fill=(255, 255, 255)
)
# 转回OpenCV格式
result_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
return result_image
class Ocr:
def __init__(self, engine: RapidOCR):
self.__engine = engine
@ -64,7 +125,7 @@ class Ocr:
result, elapse = self.__engine(img_content)
if result is None:
return []
return [OcrResult(
ret = [OcrResult(
text=unicodedata.normalize('NFKC', r[1]).replace('ą', 'a'), # HACK: 识别结果中包含奇怪的符号,暂时替换掉
rect=(
int(r[0][0][0]), # 左上x
@ -74,6 +135,17 @@ class Ocr:
),
confidence=r[2] # type: ignore
) for r in result] # type: ignore
if debug.enabled:
result_image = _draw_result(img, ret)
debug_result(
'ocr',
result_image,
f"result: \n" + \
"<table class='result-table'><tr><th>Text</th><th>Confidence</th></tr>" + \
"\n".join([f"<tr><td>{r.text}</td><td>{r.confidence:.2f}</td></tr>" for r in ret]) + \
"</table>"
)
return ret
def find(self, img: 'MatLike', text: str | re.Pattern | StringMatchFunction) -> OcrResult | None:
"""

View File

@ -751,12 +751,27 @@ if __name__ == '__main__':
getLogger(__name__).setLevel(logging.DEBUG)
init_context()
# produce_end()
while not image.wait_for_any([
R.InPurodyuusu.TextPDiary, # 普通周
R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
], timeout=2):
logger.info("Action scene not detected. Retry...")
acquisitions()
sleep(3)
# image.wait_for_any([
# R.InPurodyuusu.TextPDiary, # 普通周
# R.InPurodyuusu.ButtonFinalPracticeDance # 离考试剩余一周
# ], timeout=2)
# while True:
# sleep(10)
# exam()
# produce_end()
# enter_recommended_action()
# remaing_turns_and_points()
practice()
until_action_scene()
# practice()
# until_action_scene()
# acquisitions()
# acquire_pdorinku(0)
# image.wait_for(R.InPurodyuusu.InPractice.PDorinkuIcon)

View File

@ -1,2 +1,2 @@
jinja2==3.1.5
pyinstaller==6.11.1
pyinstaller==6.11.1

View File

@ -1,6 +1,15 @@
adbutils==2.8.0
# 图像处理 & OCR
opencv-python==4.10.0.84
rapidocr_onnxruntime==1.4.3
av==14.0.1
thefuzz==0.22.1
# Adb 控制
adbutils==2.8.0
# 可视化调试页面
fastapi==0.115.6
uvicorn==0.34.0
python-multipart==0.0.20
websockets==14.1
numpy==2.2.1
# 其他
typing-extensions==4.12.2

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 KiB