feat(core): 新增 uiautomator2 控制与截图方法

This commit is contained in:
XcantloadX 2025-02-12 10:50:24 +08:00
parent f1a05e8cfb
commit fa55b6d871
15 changed files with 943 additions and 415 deletions

View File

@ -3,7 +3,7 @@ import cv2
from cv2.typing import MatLike
import numpy as np
from typing import NamedTuple
from kotonebot.client.device.fast_screenshot import AdbFastScreenshots
from kotonebot.client.fast_screenshot import AdbFastScreenshots
def cv_imread(filePath):

View File

@ -24,10 +24,9 @@ from typing_extensions import deprecated
import cv2
from cv2.typing import MatLike
from kotonebot.client.protocol import DeviceABC
from kotonebot.client.device import Device
from kotonebot.backend.util import Rect
import kotonebot.backend.image as raw_image
from kotonebot.client.device.adb import AdbDevice
from kotonebot.backend.image import (
TemplateMatchResult,
MultipleTemplateMatchResult,
@ -42,6 +41,7 @@ from kotonebot.backend.image import (
import kotonebot.backend.color as raw_color
from kotonebot.backend.color import find_rgb
from kotonebot.backend.ocr import Ocr, OcrResult, OcrResultList, jp, en, StringMatchFunction
from kotonebot.client.factory import create_device
from kotonebot.config.manager import load_config, save_config
from kotonebot.config.base_config import UserConfig
from kotonebot.backend.core import Image, HintBox
@ -639,8 +639,8 @@ class Forwarded:
setattr(self._FORWARD_getter(), name, value)
# HACK: 这应该要有个更好的实现方式
class ContextDevice(DeviceABC):
def __init__(self, device: DeviceABC):
class ContextDevice(Device):
def __init__(self, device: Device):
self._device = device
def screenshot(self):
@ -677,18 +677,16 @@ class Context(Generic[T]):
self.__vars = ContextGlobalVars()
self.__debug = ContextDebug(self)
self.__config = ContextConfig[T](self, config_type)
from adbutils import adb
ip = self.config.current.backend.adb_ip
port = self.config.current.backend.adb_port
adb.connect(f'{ip}:{port}')
# TODO: 处理链接失败情况
d = [d for d in adb.device_list() if d.serial == f'{ip}:{port}']
self.__device = ContextDevice(AdbDevice(d[0]))
self.__device = ContextDevice(create_device(f'{ip}:{port}', 'adb'))
def inject(
self,
*,
device: Optional[ContextDevice | DeviceABC] = None,
device: Optional[ContextDevice | Device] = None,
ocr: Optional[ContextOcr] = None,
image: Optional[ContextImage] = None,
color: Optional[ContextColor] = None,
@ -697,7 +695,7 @@ class Context(Generic[T]):
config: Optional[ContextConfig] = None,
):
if device is not None:
if isinstance(device, DeviceABC):
if isinstance(device, Device):
self.__device = ContextDevice(device)
else:
self.__device = device
@ -805,7 +803,7 @@ def init_context(
def inject_context(
*,
device: Optional[ContextDevice | DeviceABC] = None,
device: Optional[ContextDevice | Device] = None,
ocr: Optional[ContextOcr] = None,
image: Optional[ContextImage] = None,
color: Optional[ContextColor] = None,

View File

@ -12,7 +12,7 @@ import cv2
from cv2.typing import MatLike
if TYPE_CHECKING:
from kotonebot.client.protocol import DeviceABC
from kotonebot.client.protocol import Device
from kotonebot.backend.color import HsvColor
from .core import Image
@ -56,7 +56,7 @@ def crop_rect(img: MatLike, rect: Rect) -> MatLike:
class DeviceHookContextManager:
def __init__(
self,
device: 'DeviceABC',
device: 'Device',
*,
screenshot_hook_before: Callable[[], MatLike|None] | None = None,
screenshot_hook_after: Callable[[MatLike], MatLike] | None = None,
@ -86,7 +86,7 @@ class DeviceHookContextManager:
self.device.click_hooks_before.remove(self.click_hook_before)
def cropped(
device: 'DeviceABC',
device: 'Device',
x1: float = 0,
y1: float = 0,
x2: float = 1,

View File

@ -1 +1 @@
from .protocol import DeviceABC
from .device import Device

394
kotonebot/client/device.py Normal file
View File

@ -0,0 +1,394 @@
import logging
from typing import Callable, cast, Literal, overload
from typing_extensions import override, deprecated
import numpy as np
import cv2
from cv2.typing import MatLike
from adbutils import AdbClient, adb
from adbutils._device import AdbDevice
from kotonebot.backend.core import HintBox
from kotonebot.backend.util import Rect, is_rect
from .protocol import ClickableObjectProtocol, Commandable, Touchable, Screenshotable
logger = logging.getLogger(__name__)
# class AdbDevice(DeviceABC):
# def __init__(self, device: Device) -> None:
# super().__init__()
# self.device = device
# @override
# def launch_app(self, package_name: str) -> None:
# self.device.shell(f"monkey -p {package_name} 1")
# @override
# def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float|None = None) -> None:
# if duration is not None:
# logger.warning("Swipe duration is not supported with AdbDevice. Ignoring duration.")
# self.device.shell(f"input touchscreen swipe {x1} {y1} {x2} {y2}")
# @override
# def screenshot(self) -> MatLike:
# if self.screenshot_hook_before is not None:
# logger.debug("execute screenshot hook before")
# img = self.screenshot_hook_before()
# if img is not None:
# logger.debug("screenshot hook before returned image")
# return img
# img = self.screenshot_raw()
# if self.screenshot_hook_after is not None:
# img = self.screenshot_hook_after(img)
# return img
# @override
# def screenshot_raw(self) -> MatLike:
# return cv2.cvtColor(np.array(self.device.screenshot()), cv2.COLOR_RGB2BGR)
class HookContextManager:
def __init__(self, device: 'Device', func: Callable[[MatLike], MatLike]):
self.device = device
self.func = func
self.old_func = device.screenshot_hook_after
def __enter__(self):
self.device.screenshot_hook_after = self.func
return self
def __exit__(self, exc_type, exc_value, traceback):
self.device.screenshot_hook_after = self.old_func
class PinContextManager:
def __init__(self, device: 'Device'):
self.device = device
self.old_hook = device.screenshot_hook_before
self.memo = None
def __hook(self) -> MatLike:
if self.memo is None:
self.memo = self.device.screenshot_raw()
return self.memo
def __enter__(self):
self.device.screenshot_hook_before = self.__hook
return self
def __exit__(self, exc_type, exc_value, traceback):
self.device.screenshot_hook_before = self.old_hook
def update(self) -> None:
"""
更新记住的截图
"""
self.memo = self.device.screenshot_raw()
class Device:
def __init__(self, adb_connection: AdbDevice | None = None) -> None:
self._adb: AdbDevice | None = adb_connection
self.screenshot_hook_after: Callable[[MatLike], MatLike] | None = None
"""截图后调用的函数"""
self.screenshot_hook_before: Callable[[], MatLike | None] | None = None
"""截图前调用的函数。返回修改后的截图。"""
self.click_hooks_before: list[Callable[[int, int], tuple[int, int]]] = []
"""点击前调用的函数。返回修改后的点击坐标。"""
self.last_find: Rect | ClickableObjectProtocol | None = None
"""上次 image 对象或 ocr 对象的寻找结果"""
self.orientation: Literal['portrait', 'landscape'] = 'portrait'
"""
设备当前方向默认为竖屏
横屏时为 'landscape'竖屏时为 'portrait'
"""
self._command: Commandable
self._touch: Touchable
self._screenshot: Screenshotable
@property
def adb(self) -> AdbDevice:
if self._adb is None:
raise ValueError("AdbClient is not connected")
return self._adb
@adb.setter
def adb(self, value: AdbDevice) -> None:
self._adb = value
def launch_app(self, package_name: str) -> None:
"""
根据包名启动 app
"""
self._command.launch_app(package_name)
@overload
def click(self) -> None:
"""
点击上次 `image` 对象或 `ocr` 对象的寻找结果仅包括返回单个结果的函数
不包括 `image.raw()` `ocr.raw()` 的结果
如果没有上次寻找结果或上次寻找结果为空会抛出异常 ValueError
"""
...
@overload
def click(self, x: int, y: int) -> None:
"""
点击屏幕上的某个点
"""
...
@overload
def click(self, hint_box: HintBox) -> None:
"""
点击屏幕上的某个矩形区域
"""
...
@overload
def click(self, rect: Rect) -> None:
"""
从屏幕上的某个矩形区域随机选择一个点并点击
"""
...
@overload
def click(self, clickable: ClickableObjectProtocol) -> None:
"""
点击屏幕上的某个可点击对象
"""
...
def click(self, *args, **kwargs) -> None:
arg1 = args[0] if len(args) > 0 else None
arg2 = args[1] if len(args) > 1 else None
if arg1 is None:
self.__click_last()
elif isinstance(arg1, HintBox):
self.__click_hint_box(arg1)
elif is_rect(arg1):
self.__click_rect(arg1)
elif isinstance(arg1, int) and isinstance(arg2, int):
self.__click_point(arg1, arg2)
elif isinstance(arg1, ClickableObjectProtocol):
self.__click_clickable(arg1)
else:
raise ValueError(f"Invalid arguments: {arg1}, {arg2}")
def __click_last(self) -> None:
if self.last_find is None:
raise ValueError("No last find result. Make sure you are not calling the 'raw' functions.")
self.click(self.last_find)
def __click_rect(self, rect: Rect) -> None:
# 从矩形中心的 60% 内部随机选择一点
x = rect[0] + rect[2] // 2 + np.random.randint(-int(rect[2] * 0.3), int(rect[2] * 0.3))
y = rect[1] + rect[3] // 2 + np.random.randint(-int(rect[3] * 0.3), int(rect[3] * 0.3))
x = int(x)
y = int(y)
self.click(x, y)
def __click_point(self, x: int, y: int) -> None:
for hook in self.click_hooks_before:
logger.debug(f"Executing click hook before: ({x}, {y})")
x, y = hook(x, y)
logger.debug(f"Click hook before result: ({x}, {y})")
logger.debug(f"Click: {x}, {y}")
self._touch.click(x, y)
def __click_clickable(self, clickable: ClickableObjectProtocol) -> None:
self.click(clickable.rect)
def __click_hint_box(self, hint_box: HintBox) -> None:
self.click(hint_box.rect)
def click_center(self) -> None:
"""
点击屏幕中心
此方法会受到 `self.orientation` 的影响
调用前确保 `orientation` 属性与设备方向一致
否则点击位置会不正确
"""
x, y = self.screen_size[0] // 2, self.screen_size[1] // 2
self.click(x, y)
@overload
def double_click(self, x: int, y: int, interval: float = 0.25) -> None:
"""
双击屏幕上的某个点
"""
...
@overload
def double_click(self, rect: Rect, interval: float = 0.25) -> None:
"""
双击屏幕上的某个矩形区域
"""
...
@overload
def double_click(self, clickable: ClickableObjectProtocol, interval: float = 0.25) -> None:
"""
双击屏幕上的某个可点击对象
"""
...
def double_click(self, *args, **kwargs) -> None:
from kotonebot import sleep
arg0 = args[0]
if is_rect(arg0) or isinstance(arg0, ClickableObjectProtocol):
rect = arg0
interval = kwargs.get('interval', 0.25)
self.click(rect)
sleep(interval)
self.click(rect)
else:
x = args[0]
y = args[1]
interval = kwargs.get('interval', 0.25)
self.click(x, y)
sleep(interval)
self.click(x, y)
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float|None = None) -> None:
"""
滑动屏幕
"""
self._touch.swipe(x1, y1, x2, y2, duration)
def swipe_scaled(self, x1: float, y1: float, x2: float, y2: float, duration: float|None = None) -> None:
"""
滑动屏幕参数为屏幕坐标的百分比
:param x1: 起始点 x 坐标百分比范围 [0, 1]
:param y1: 起始点 y 坐标百分比范围 [0, 1]
:param x2: 结束点 x 坐标百分比范围 [0, 1]
:param y2: 结束点 y 坐标百分比范围 [0, 1]
:param duration: 滑动持续时间单位秒None 表示使用默认值
"""
w, h = self.screen_size
self.swipe(int(w * x1), int(h * y1), int(w * x2), int(h * y2), duration)
def screenshot(self) -> MatLike:
"""
截图
"""
if self.screenshot_hook_before is not None:
logger.debug("execute screenshot hook before")
img = self.screenshot_hook_before()
if img is not None:
logger.debug("screenshot hook before returned image")
return img
img = self.screenshot_raw()
if self.screenshot_hook_after is not None:
img = self.screenshot_hook_after(img)
return img
def screenshot_raw(self) -> MatLike:
"""
截图不调用任何 Hook
"""
return self._screenshot.screenshot()
def hook(self, func: Callable[[MatLike], MatLike]) -> HookContextManager:
"""
注册 Hook在截图前将会调用此函数对截图进行处理
"""
return HookContextManager(self, func)
@deprecated('改用 @task/@action 装饰器中的 screenshot_mode 参数')
def pinned(self) -> PinContextManager:
"""
记住下次截图结果并将截图调整为手动挡
之后截图都会返回记住的数据节省重复截图时间
调用返回对象中的 PinContextManager.update() 可以立刻更新记住的截图
"""
return PinContextManager(self)
@property
def screen_size(self) -> tuple[int, int]:
"""
屏幕尺寸格式为 `(width, height)`
**注意** 此属性返回的分辨率会随设备方向变化
如果 `self.orientation` `landscape`则返回的分辨率是横屏下的分辨率
否则返回竖屏下的分辨率
`self.orientation` 属性默认为竖屏如果需要自动检测
调用 `self.detect_orientation()` 方法
如果已知方向也可以直接设置 `self.orientation` 属性
"""
return self._screenshot.screen_size
def current_package(self) -> str | None:
"""
获取前台 APP 的包名
:return: 前台 APP 的包名如果获取失败则返回 None
:exception: 如果设备不支持此功能则抛出 NotImplementedError
"""
return self._command.current_package()
def detect_orientation(self) -> Literal['portrait', 'landscape'] | None:
"""
检测当前设备方向并设置 `self.orientation` 属性
:return: 检测到的方向如果无法检测到则返回 None
"""
return self._screenshot.detect_orientation()
if __name__ == "__main__":
from kotonebot.client.implements.adb import AdbImpl
from .implements.uiautomator2 import UiAutomator2Impl
print("server version:", adb.server_version())
adb.connect("127.0.0.1:5555")
print("devices:", adb.device_list())
d = adb.device_list()[-1]
d.shell("dumpsys activity top | grep ACTIVITY | tail -n 1")
dd = Device(d)
adb_imp = AdbImpl(dd)
dd._command = adb_imp
dd._touch = adb_imp
# dd._screenshot = adb_imp
# dd._screenshot = MinicapScreenshotImpl(dd)
dd._screenshot = UiAutomator2Impl(dd)
# dd.launch_app("com.android.settings")
dd.adb = d
# 实时展示画面
import cv2
import numpy as np
import time
last_time = time.time()
while True:
start_time = time.time()
img = dd.screenshot()
# 50% 缩放
img = cv2.resize(img, (img.shape[1] // 2, img.shape[0] // 2))
# 计算帧间隔
interval = start_time - last_time
fps = 1 / interval if interval > 0 else 0
last_time = start_time
# 获取当前时间和帧率信息
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
fps_text = f"FPS: {fps:.1f} {interval*1000:.1f}ms"
# 在图像上绘制信息
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(img, current_time, (10, 30), font, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
cv2.putText(img, fps_text, (10, 60), font, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
cv2.imshow("screen", img)
cv2.waitKey(1)

View File

@ -1,165 +0,0 @@
import logging
from typing import Callable, cast
from typing_extensions import override
import numpy as np
import cv2
from cv2.typing import MatLike
from adbutils import AdbClient, adb
from adbutils._device import AdbDevice as Device
from kotonebot.backend.core import HintBox
from kotonebot.backend.util import Rect, is_rect
from ..protocol import DeviceABC, ClickableObjectProtocol
logger = logging.getLogger(__name__)
class AdbDevice(DeviceABC):
def __init__(self, device: Device) -> None:
super().__init__()
self.device = device
@override
def launch_app(self, package_name: str) -> None:
self.device.shell(f"monkey -p {package_name} 1")
@override
def click(self, arg1=None, arg2=None) -> None:
if arg1 is None:
self.__click_last()
elif isinstance(arg1, HintBox):
self.__click_hint_box(arg1)
elif is_rect(arg1):
self.__click_rect(arg1)
elif isinstance(arg1, int) and isinstance(arg2, int):
self.__click_point(arg1, arg2)
elif isinstance(arg1, ClickableObjectProtocol):
self.__click_clickable(arg1)
else:
raise ValueError(f"Invalid arguments: {arg1}, {arg2}")
def __click_last(self) -> None:
if self.last_find is None:
raise ValueError("No last find result. Make sure you are not calling the 'raw' functions.")
self.click(self.last_find)
def __click_rect(self, rect: Rect) -> None:
# 从矩形中心的 60% 内部随机选择一点
x = rect[0] + rect[2] // 2 + np.random.randint(-int(rect[2] * 0.3), int(rect[2] * 0.3))
y = rect[1] + rect[3] // 2 + np.random.randint(-int(rect[3] * 0.3), int(rect[3] * 0.3))
x = int(x)
y = int(y)
self.click(x, y)
def __click_point(self, x: int, y: int) -> None:
for hook in self.click_hooks_before:
logger.debug(f"Executing click hook before: ({x}, {y})")
x, y = hook(x, y)
logger.debug(f"Click hook before result: ({x}, {y})")
logger.debug(f"Click: {x}, {y}")
self.device.shell(f"input tap {x} {y}")
def __click_clickable(self, clickable: ClickableObjectProtocol) -> None:
self.click(clickable.rect)
def __click_hint_box(self, hint_box: HintBox) -> None:
self.click(hint_box.rect)
@override
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float|None = None) -> None:
if duration is not None:
logger.warning("Swipe duration is not supported with AdbDevice. Ignoring duration.")
self.device.shell(f"input touchscreen swipe {x1} {y1} {x2} {y2}")
@override
def screenshot(self) -> MatLike:
if self.screenshot_hook_before is not None:
logger.debug("execute screenshot hook before")
img = self.screenshot_hook_before()
if img is not None:
logger.debug("screenshot hook before returned image")
return img
img = self.screenshot_raw()
if self.screenshot_hook_after is not None:
img = self.screenshot_hook_after(img)
return img
@override
def screenshot_raw(self) -> MatLike:
return cv2.cvtColor(np.array(self.device.screenshot()), cv2.COLOR_RGB2BGR)
@property
def screen_size(self) -> tuple[int, int]:
ret = cast(str, self.device.shell("wm size")).strip('Physical size: ')
spiltted = tuple(map(int, ret.split("x")))
landscape = self.orientation == 'landscape'
spiltted = tuple(sorted(spiltted, reverse=landscape))
if len(spiltted) != 2:
raise ValueError(f"Invalid screen size: {ret}")
return spiltted
@staticmethod
def list_devices() -> list[str]:
raise NotImplementedError
@override
def start_app(self, package_name: str) -> None:
self.device.shell(f"monkey -p {package_name} 1")
@override
def detect_orientation(self):
# 判断方向https://stackoverflow.com/questions/10040624/check-if-device-is-landscape-via-adb
# 但是上面这种方法不准确
# 因此这里直接通过截图判断方向
img = self.screenshot()
if img.shape[0] > img.shape[1]:
return 'portrait'
return 'landscape'
@override
def current_package(self) -> str | None:
# https://blog.csdn.net/guangdeshishe/article/details/117154406
result_text = self.device.shell('dumpsys activity top | grep ACTIVITY | tail -n 1')
logger.debug(f"adb returned: {result_text}")
if not isinstance(result_text, str):
logger.error(f"Invalid result_text: {result_text}")
return None
result_text = result_text.strip()
if result_text == '':
logger.error("No current package found")
return None
_, activity, _, pid = result_text.split(' ')
package = activity.split('/')[0]
return package
if __name__ == "__main__":
print("server version:", adb.server_version())
adb.connect("127.0.0.1:16384", )
print("devices:", adb.device_list())
d = adb.device_list()[0]
dd = AdbDevice(d)
# dd.launch_app("com.android.settings")
# 实时展示画面
import cv2
import numpy as np
while True:
img = dd.screenshot()
# img = cv2.imdecode(np.frombuffer(img, np.uint8), cv2.IMREAD_COLOR)
# img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
cv2.imshow("screen", img)
# 50% 缩放
img = cv2.resize(img, (img.shape[1] // 4, img.shape[0] // 4))
# 获取当前时间
import time
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 在图像上绘制时间
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(img, current_time, (10, 30), font, 1, (0, 255, 0), 2, cv2.LINE_AA)
cv2.waitKey(1)

View File

@ -0,0 +1,30 @@
from enum import Enum
from typing import Literal
from .device import Device
from .implements.adb import AdbImpl
from .implements.uiautomator2 import UiAutomator2Impl
from adbutils import adb
DeviceImpl = Literal['adb', 'uiautomator2']
def create_device(
addr: str,
impl: DeviceImpl,
) -> Device:
adb.connect("127.0.0.1:5555")
d = [d for d in adb.device_list() if d.serial == addr]
if len(d) == 0:
raise ValueError(f"Device {addr} not found")
d = d[0]
device = Device(d)
if impl == 'adb':
device._command = AdbImpl(device)
device._touch = AdbImpl(device)
device._screenshot = AdbImpl(device)
elif impl == 'uiautomator2':
device._command = UiAutomator2Impl(device)
device._touch = UiAutomator2Impl(device)
device._screenshot = UiAutomator2Impl(device)
return device

View File

@ -0,0 +1,68 @@
import logging
from typing import cast
from typing_extensions import override
import cv2
import numpy as np
from cv2.typing import MatLike
from ..device import Device
from ..protocol import Commandable, Touchable, Screenshotable
logger = logging.getLogger(__name__)
class AdbImpl(Commandable, Touchable, Screenshotable):
def __init__(self, device: Device):
self.device = device
self.adb = device.adb
@override
def launch_app(self, package_name: str) -> None:
self.adb.shell(f"monkey -p {package_name} 1")
@override
def current_package(self) -> str | None:
# https://blog.csdn.net/guangdeshishe/article/details/117154406
result_text = self.adb.shell('dumpsys activity top | grep ACTIVITY | tail -n 1')
logger.debug(f"adb returned: {result_text}")
if not isinstance(result_text, str):
logger.error(f"Invalid result_text: {result_text}")
return None
result_text = result_text.strip()
if result_text == '':
logger.error("No current package found")
return None
_, activity, _, pid = result_text.split(' ')
package = activity.split('/')[0]
return package
@override
def detect_orientation(self):
# 判断方向https://stackoverflow.com/questions/10040624/check-if-device-is-landscape-via-adb
# 但是上面这种方法不准确
# 因此这里直接通过截图判断方向
img = self.screenshot()
if img.shape[0] > img.shape[1]:
return 'portrait'
return 'landscape'
@property
def screen_size(self) -> tuple[int, int]:
ret = cast(str, self.adb.shell("wm size")).strip('Physical size: ')
spiltted = tuple(map(int, ret.split("x")))
landscape = self.device.orientation == 'landscape'
spiltted = tuple(sorted(spiltted, reverse=landscape))
if len(spiltted) != 2:
raise ValueError(f"Invalid screen size: {ret}")
return spiltted
def screenshot(self) -> MatLike:
return cv2.cvtColor(np.array(self.adb.screenshot()), cv2.COLOR_RGB2BGR)
def click(self, x: int, y: int) -> None:
self.adb.shell(f"input tap {x} {y}")
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float | None = None) -> None:
if duration is not None:
logger.warning("Swipe duration is not supported with AdbDevice. Ignoring duration.")
self.adb.shell(f"input touchscreen swipe {x1} {y1} {x2} {y2}")

View File

@ -0,0 +1,70 @@
from typing import Literal
from ..device import Device
from ..protocol import Screenshotable, Commandable, Touchable
import numpy as np
import uiautomator2 as u2
from cv2.typing import MatLike
class UiAutomator2Impl(Screenshotable, Commandable, Touchable):
def __init__(self, device: Device):
self.device = device
self.u2_client = u2.Device(device.adb.serial)
def screenshot(self) -> MatLike:
"""
截图
"""
image = self.u2_client.screenshot(format='opencv')
assert isinstance(image, np.ndarray)
return image
@property
def screen_size(self) -> tuple[int, int]:
info = self.u2_client.info
sizes = info['displayWidth'], info['displayHeight']
if self.device.orientation == 'landscape':
return (max(sizes), min(sizes))
else:
return (min(sizes), max(sizes))
def detect_orientation(self) -> Literal['portrait', 'landscape'] | None:
"""
检测设备方向
"""
orientation = self.u2_client.info['displayRotation']
if orientation == 1:
return 'portrait'
elif orientation == 0:
return 'landscape'
else:
return None
def launch_app(self, package_name: str) -> None:
"""
启动应用
"""
self.u2_client.app_start(package_name)
def current_package(self) -> str | None:
"""
获取当前应用包名
"""
try:
result = self.u2_client.app_current()
return result['package']
except:
return None
def click(self, x: int, y: int) -> None:
"""
点击屏幕
"""
self.u2_client.click(x, y)
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float|None = None) -> None:
"""
滑动屏幕
"""
self.u2_client.swipe(x1, y1, x2, y2, duration=duration or 0.1)

View File

@ -1,12 +1,10 @@
from time import sleep
from typing import Callable, Protocol, TYPE_CHECKING, overload, runtime_checkable, Literal
from abc import ABC
from typing_extensions import deprecated
from typing import Protocol, TYPE_CHECKING, runtime_checkable, Literal
from cv2.typing import MatLike
from kotonebot.backend.core import HintBox
from kotonebot.backend.util import Rect, is_rect
from kotonebot.backend.util import Rect
if TYPE_CHECKING:
from .device import Device
@runtime_checkable
class ClickableObjectProtocol(Protocol):
@ -24,212 +22,15 @@ class DeviceScreenshotProtocol(Protocol):
"""
...
class HookContextManager:
def __init__(self, device: 'DeviceABC', func: Callable[[MatLike], MatLike]):
self.device = device
self.func = func
self.old_func = device.screenshot_hook_after
def __enter__(self):
self.device.screenshot_hook_after = self.func
return self
def __exit__(self, exc_type, exc_value, traceback):
self.device.screenshot_hook_after = self.old_func
class PinContextManager:
def __init__(self, device: 'DeviceABC'):
self.device = device
self.old_hook = device.screenshot_hook_before
self.memo = None
def __hook(self) -> MatLike:
if self.memo is None:
self.memo = self.device.screenshot_raw()
return self.memo
def __enter__(self):
self.device.screenshot_hook_before = self.__hook
return self
def __exit__(self, exc_type, exc_value, traceback):
self.device.screenshot_hook_before = self.old_hook
def update(self) -> None:
"""
更新记住的截图
"""
self.memo = self.device.screenshot_raw()
class DeviceABC(ABC):
"""
针对单个设备可执行的操作的抽象基类
"""
def __init__(self) -> None:
self.screenshot_hook_after: Callable[[MatLike], MatLike] | None = None
"""截图后调用的函数"""
self.screenshot_hook_before: Callable[[], MatLike | None] | None = None
"""截图前调用的函数。返回修改后的截图。"""
self.click_hooks_before: list[Callable[[int, int], tuple[int, int]]] = []
"""点击前调用的函数。返回修改后的点击坐标。"""
self.last_find: Rect | ClickableObjectProtocol | None = None
"""上次 image 对象或 ocr 对象的寻找结果"""
self.orientation: Literal['portrait', 'landscape'] = 'portrait'
"""
设备当前方向默认为竖屏
横屏时为 'landscape'竖屏时为 'portrait'
"""
@staticmethod
def list_devices() -> list[str]:
...
def launch_app(self, package_name: str) -> None:
"""
根据包名启动 app
"""
...
@overload
def click(self) -> None:
"""
点击上次 `image` 对象或 `ocr` 对象的寻找结果仅包括返回单个结果的函数
不包括 `image.raw()` `ocr.raw()` 的结果
如果没有上次寻找结果或上次寻找结果为空会抛出异常 ValueError
"""
...
@overload
def click(self, x: int, y: int) -> None:
"""
点击屏幕上的某个点
"""
...
@overload
def click(self, hint_box: HintBox) -> None:
"""
点击屏幕上的某个矩形区域
"""
...
@overload
def click(self, rect: Rect) -> None:
"""
从屏幕上的某个矩形区域随机选择一个点并点击
"""
...
@overload
def click(self, clickable: ClickableObjectProtocol) -> None:
"""
点击屏幕上的某个可点击对象
"""
...
def click(self, *args, **kwargs) -> None:
...
def click_center(self) -> None:
"""
点击屏幕中心
此方法会受到 `self.orientation` 的影响
调用前确保 `orientation` 属性与设备方向一致
否则点击位置会不正确
"""
x, y = self.screen_size[0] // 2, self.screen_size[1] // 2
self.click(x, y)
@overload
def double_click(self, x: int, y: int, interval: float = 0.25) -> None:
"""
双击屏幕上的某个点
"""
...
@overload
def double_click(self, rect: Rect, interval: float = 0.25) -> None:
"""
双击屏幕上的某个矩形区域
"""
...
@overload
def double_click(self, clickable: ClickableObjectProtocol, interval: float = 0.25) -> None:
"""
双击屏幕上的某个可点击对象
"""
...
def double_click(self, *args, **kwargs) -> None:
arg0 = args[0]
if is_rect(arg0) or isinstance(arg0, ClickableObjectProtocol):
rect = arg0
interval = kwargs.get('interval', 0.25)
self.click(rect)
sleep(interval)
self.click(rect)
else:
x = args[0]
y = args[1]
interval = kwargs.get('interval', 0.25)
self.click(x, y)
sleep(interval)
self.click(x, y)
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float|None = None) -> None:
"""
滑动屏幕
"""
...
def swipe_scaled(self, x1: float, y1: float, x2: float, y2: float, duration: float|None = None) -> None:
"""
滑动屏幕参数为屏幕坐标的百分比
:param x1: 起始点 x 坐标百分比范围 [0, 1]
:param y1: 起始点 y 坐标百分比范围 [0, 1]
:param x2: 结束点 x 坐标百分比范围 [0, 1]
:param y2: 结束点 y 坐标百分比范围 [0, 1]
:param duration: 滑动持续时间单位秒None 表示使用默认值
"""
w, h = self.screen_size
self.swipe(int(w * x1), int(h * y1), int(w * x2), int(h * y2), duration)
def screenshot(self) -> MatLike:
"""
截图
"""
...
def screenshot_raw(self) -> MatLike:
"""
截图不调用任何 Hook
"""
...
def hook(self, func: Callable[[MatLike], MatLike]) -> HookContextManager:
"""
注册 Hook在截图前将会调用此函数对截图进行处理
"""
return HookContextManager(self, func)
@deprecated('改用 @task/@action 装饰器中的 screenshot_mode 参数')
def pinned(self) -> PinContextManager:
"""
记住下次截图结果并将截图调整为手动挡
之后截图都会返回记住的数据节省重复截图时间
调用返回对象中的 PinContextManager.update() 可以立刻更新记住的截图
"""
return PinContextManager(self)
@runtime_checkable
class Commandable(Protocol):
def __init__(self, device: 'Device'): ...
def launch_app(self, package_name: str) -> None: ...
def current_package(self) -> str | None: ...
@runtime_checkable
class Screenshotable(Protocol):
def __init__(self, device: 'Device'): ...
@property
def screen_size(self) -> tuple[int, int]:
"""
@ -244,26 +45,12 @@ class DeviceABC(ABC):
如果已知方向也可以直接设置 `self.orientation` 属性
"""
...
def detect_orientation(self) -> Literal['portrait', 'landscape'] | None: ...
def screenshot(self) -> MatLike: ...
def current_package(self) -> str | None:
"""
获取前台 APP 的包名
:return: 前台 APP 的包名如果获取失败则返回 None
:exception: 如果设备不支持此功能则抛出 NotImplementedError
"""
...
def detect_orientation(self) -> Literal['portrait', 'landscape'] | None:
"""
检测当前设备方向并设置 `self.orientation` 属性
:return: 检测到的方向如果无法检测到则返回 None
"""
...
def start_app(self, package_name: str) -> None:
"""
启动某个 APP
"""
...
@runtime_checkable
class Touchable(Protocol):
def __init__(self, device: 'Device'): ...
def click(self, x: int, y: int) -> None: ...
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float|None = None) -> None: ...

344
kotonebot/tools/mirror.py Normal file
View File

@ -0,0 +1,344 @@
import wx
import cv2
import numpy as np
import time
from typing import Optional, Tuple, Callable
from threading import Thread, Lock
from cv2.typing import MatLike
from queue import Queue
from kotonebot.client.device import Device
class DeviceMirrorPanel(wx.Panel):
def __init__(self, parent, device: Device, log_callback=None):
super().__init__(parent)
self.device = device
self.screen_bitmap: Optional[wx.Bitmap] = None
self.fps = 0
self.last_frame_time = time.time()
self.frame_count = 0
self.is_running = True
self.lock = Lock()
self.last_mouse_pos = (0, 0)
self.is_dragging = False
self.screenshot_interval = 0 # 截图耗时(ms)
self.log_callback = log_callback
self.operation_queue = Queue()
# 设置背景色为黑色
self.SetBackgroundColour(wx.BLACK)
# 双缓冲,减少闪烁
self.SetDoubleBuffered(True)
# 绑定事件
self.Bind(wx.EVT_PAINT, self.on_paint)
self.Bind(wx.EVT_SIZE, self.on_size)
self.Bind(wx.EVT_LEFT_DOWN, self.on_left_down)
self.Bind(wx.EVT_LEFT_UP, self.on_left_up)
self.Bind(wx.EVT_MOTION, self.on_motion)
# 启动刷新线程
self.update_thread = Thread(target=self.update_screen, daemon=True)
self.update_thread.start()
# 启动操作处理线程
self.operation_thread = Thread(target=self.process_operations, daemon=True)
self.operation_thread.start()
def process_operations(self):
"""处理设备操作的线程"""
while self.is_running:
try:
operation = self.operation_queue.get()
if operation is not None:
operation()
self.operation_queue.task_done()
except Exception as e:
if self.log_callback:
self.log_callback(f"操作执行错误: {e}")
def execute_device_operation(self, operation: Callable):
"""将设备操作添加到队列"""
self.operation_queue.put(operation)
def update_screen(self):
while self.is_running:
try:
# 获取设备截图并计时
start_time = time.time()
frame = self.device.screenshot()
end_time = time.time()
self.screenshot_interval = int((end_time - start_time) * 1000)
if frame is None:
continue
# 计算FPS
current_time = time.time()
self.frame_count += 1
if current_time - self.last_frame_time >= 1.0:
self.fps = self.frame_count
self.frame_count = 0
self.last_frame_time = current_time
# 转换为wx.Bitmap
height, width = frame.shape[:2]
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
wximage = wx.Bitmap.FromBuffer(width, height, frame)
with self.lock:
self.screen_bitmap = wximage
# 请求重绘
wx.CallAfter(self.Refresh)
# 控制刷新率
time.sleep(1/60)
except Exception as e:
print(f"Error updating screen: {e}")
time.sleep(1)
def on_paint(self, event):
dc = wx.BufferedPaintDC(self)
# 清空背景
dc.SetBackground(wx.Brush(wx.BLACK))
dc.Clear()
if not self.screen_bitmap:
return
# 绘制设备画面
with self.lock:
# 计算缩放比例,保持宽高比
panel_width, panel_height = self.GetSize()
bitmap_width = self.screen_bitmap.GetWidth()
bitmap_height = self.screen_bitmap.GetHeight()
scale = min(panel_width/bitmap_width, panel_height/bitmap_height)
scaled_width = int(bitmap_width * scale)
scaled_height = int(bitmap_height * scale)
# 居中显示
x = (panel_width - scaled_width) // 2
y = (panel_height - scaled_height) // 2
if scale != 1:
img = self.screen_bitmap.ConvertToImage()
img = img.Scale(scaled_width, scaled_height, wx.IMAGE_QUALITY_HIGH)
bitmap = wx.Bitmap(img)
else:
bitmap = self.screen_bitmap
dc.DrawBitmap(bitmap, x, y)
# 绘制FPS和截图时间
dc.SetTextForeground(wx.GREEN)
dc.SetFont(wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD))
dc.DrawText(f"FPS: {self.fps}", 10, 10)
dc.DrawText(f"Interval: {self.screenshot_interval}ms", 10, 30)
def on_size(self, event):
self.Refresh()
event.Skip()
def get_device_coordinates(self, x: int, y: int) -> Tuple[int, int]:
"""将面板坐标转换为设备坐标"""
if not self.screen_bitmap:
return (0, 0)
panel_width, panel_height = self.GetSize()
bitmap_width = self.screen_bitmap.GetWidth()
bitmap_height = self.screen_bitmap.GetHeight()
scale = min(panel_width/bitmap_width, panel_height/bitmap_height)
scaled_width = int(bitmap_width * scale)
scaled_height = int(bitmap_height * scale)
# 计算显示区域的偏移
x_offset = (panel_width - scaled_width) // 2
y_offset = (panel_height - scaled_height) // 2
# 转换坐标
device_x = int((x - x_offset) / scale)
device_y = int((y - y_offset) / scale)
# 确保坐标在设备范围内
device_x = max(0, min(device_x, bitmap_width-1))
device_y = max(0, min(device_y, bitmap_height-1))
return (device_x, device_y)
def on_left_down(self, event):
self.last_mouse_pos = event.GetPosition()
self.is_dragging = True
event.Skip()
def on_left_up(self, event):
if not self.is_dragging:
return
self.is_dragging = False
pos = event.GetPosition()
# 如果鼠标位置没有明显变化,执行点击
if abs(pos[0] - self.last_mouse_pos[0]) < 5 and abs(pos[1] - self.last_mouse_pos[1]) < 5:
device_x, device_y = self.get_device_coordinates(*pos)
self.execute_device_operation(lambda: self.device.click(device_x, device_y))
if self.log_callback:
self.log_callback(f"点击: ({device_x}, {device_y})")
else:
# 执行滑动
start_x, start_y = self.get_device_coordinates(*self.last_mouse_pos)
end_x, end_y = self.get_device_coordinates(*pos)
self.execute_device_operation(lambda: self.device.swipe(start_x, start_y, end_x, end_y))
if self.log_callback:
self.log_callback(f"滑动: ({start_x}, {start_y}) -> ({end_x}, {end_y})")
event.Skip()
def on_motion(self, event):
if not self.is_dragging:
event.Skip()
return
event.Skip()
class DeviceMirrorFrame(wx.Frame):
def __init__(self, device: Device):
super().__init__(None, title="设备镜像", size=(800, 600))
# 创建分割窗口
self.splitter = wx.SplitterWindow(self)
# 创建左侧面板(包含控制区域和日志区域)
self.left_panel = wx.Panel(self.splitter)
left_sizer = wx.BoxSizer(wx.VERTICAL)
# 控制区域
self.control_panel = wx.Panel(self.left_panel)
self.init_control_panel()
left_sizer.Add(self.control_panel, 0, wx.EXPAND | wx.ALL, 5)
# 日志区域
self.log_text = wx.TextCtrl(self.left_panel, style=wx.TE_MULTILINE | wx.TE_READONLY | wx.HSCROLL)
self.log_text.SetBackgroundColour(wx.BLACK)
self.log_text.SetForegroundColour(wx.GREEN)
self.log_text.SetFont(wx.Font(9, wx.FONTFAMILY_TELETYPE, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL))
left_sizer.Add(self.log_text, 1, wx.EXPAND | wx.ALL, 5)
self.left_panel.SetSizer(left_sizer)
# 创建设备画面
self.device_panel = DeviceMirrorPanel(self.splitter, device, self.log)
# 设置分割
self.splitter.SplitVertically(self.left_panel, self.device_panel)
self.splitter.SetMinimumPaneSize(200)
# 保存设备引用
self.device = device
def log(self, message: str):
"""添加日志"""
timestamp = time.strftime("%H:%M:%S", time.localtime())
wx.CallAfter(self.log_text.AppendText, f"[{timestamp}] {message}\n")
def init_control_panel(self):
vbox = wx.BoxSizer(wx.VERTICAL)
# 添加控制按钮
btn_get_resolution = wx.Button(self.control_panel, label="获取分辨率")
btn_get_resolution.Bind(wx.EVT_BUTTON, self.on_get_resolution)
vbox.Add(btn_get_resolution, 0, wx.EXPAND | wx.ALL, 5)
btn_get_orientation = wx.Button(self.control_panel, label="获取设备方向")
btn_get_orientation.Bind(wx.EVT_BUTTON, self.on_get_orientation)
vbox.Add(btn_get_orientation, 0, wx.EXPAND | wx.ALL, 5)
# 启动APP区域
hbox = wx.BoxSizer(wx.HORIZONTAL)
self.package_input = wx.TextCtrl(self.control_panel)
hbox.Add(self.package_input, 1, wx.EXPAND | wx.RIGHT, 5)
btn_launch_app = wx.Button(self.control_panel, label="启动APP")
btn_launch_app.Bind(wx.EVT_BUTTON, self.on_launch_app)
hbox.Add(btn_launch_app, 0)
vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 5)
btn_get_current_app = wx.Button(self.control_panel, label="获取前台APP")
btn_get_current_app.Bind(wx.EVT_BUTTON, self.on_get_current_app)
vbox.Add(btn_get_current_app, 0, wx.EXPAND | wx.ALL, 5)
self.control_panel.SetSizer(vbox)
def on_get_resolution(self, event):
"""获取分辨率"""
try:
width, height = self.device.screen_size
self.log(f"设备分辨率: {width}x{height}")
except Exception as e:
self.log(f"获取分辨率失败: {e}")
def on_get_orientation(self, event):
"""获取设备方向"""
try:
orientation = self.device.detect_orientation()
orientation_text = "横屏" if orientation == "landscape" else "竖屏"
self.log(f"设备方向: {orientation_text}")
except Exception as e:
self.log(f"获取设备方向失败: {e}")
def on_launch_app(self, event):
"""启动APP"""
package_name = self.package_input.GetValue().strip()
if not package_name:
self.log("请输入包名")
return
try:
self.device.launch_app(package_name)
self.log(f"启动APP: {package_name}")
except Exception as e:
self.log(f"启动APP失败: {e}")
def on_get_current_app(self, event):
"""获取前台APP"""
try:
package = self.device.current_package()
if package:
self.log(f"前台APP: {package}")
else:
self.log("获取前台APP失败")
except Exception as e:
self.log(f"获取前台APP失败: {e}")
def on_quit(self, event):
self.device_panel.is_running = False
self.Close()
def show_device_mirror(device: Device):
"""显示设备镜像窗口"""
app = wx.App()
frame = DeviceMirrorFrame(device)
frame.Show()
app.MainLoop()
if __name__ == "__main__":
# 测试代码
from kotonebot.client.implements.adb import AdbImpl
from kotonebot.client.implements.uiautomator2 import UiAutomator2Impl
from adbutils import adb
print("server version:", adb.server_version())
adb.connect("127.0.0.1:5555")
print("devices:", adb.device_list())
d = adb.device_list()[-1]
dd = Device(d)
adb_imp = AdbImpl(dd)
dd._command = adb_imp
dd._touch = adb_imp
dd._screenshot = UiAutomator2Impl(dd)
show_device_mirror(dd)

View File

@ -4,4 +4,5 @@ pyinstaller==6.11.1
twine==6.1.0
dataclasses-json==0.6.7
python-lsp-server==1.12.0
snakeviz==2.2.2
snakeviz==2.2.2
wxpython==4.2.2

View File

@ -6,6 +6,7 @@ scikit-image==0.25.0
thefuzz==0.22.1
# Adb Control
adbutils==2.8.0
uiautomator2==3.2.8
# Visual Debugger
fastapi==0.115.6
uvicorn==0.34.0