feat(core): 引入 Nemu 截图与控制方式

This commit is contained in:
XcantloadX 2025-06-23 13:57:09 +08:00
parent b0e77e2173
commit f0b91814f7
8 changed files with 565 additions and 12 deletions

View File

@ -7,8 +7,10 @@ from typing_extensions import override
from kotonebot import logging
from kotonebot.client import DeviceImpl, Device
from kotonebot.client.device import AndroidDevice
from kotonebot.client.registration import AdbBasedImpl, create_device
from kotonebot.client.implements.adb import AdbImplConfig
from kotonebot.client.implements.adb import AdbImpl, AdbImplConfig, _create_adb_device_base
from kotonebot.client.implements.nemu_ipc import NemuIpcImpl, NemuIpcImplConfig
from kotonebot.util import Countdown, Interval
from .protocol import HostProtocol, Instance, copy_type, AdbHostConfig
@ -78,6 +80,25 @@ class Mumu12Instance(Instance[AdbHostConfig]):
if self.adb_port is None:
raise ValueError("ADB port is not set and is required.")
# 新增对 nemu_ipc 的支持
if impl == 'nemu_ipc':
nemu_path = Mumu12Host._read_install_path()
if not nemu_path:
raise RuntimeError("无法找到 MuMu12 的安装路径。")
nemu_config = NemuIpcImplConfig(mumu_shell_folder=nemu_path, instance_id=int(self.id))
device = AndroidDevice()
nemu_impl = NemuIpcImpl(device, nemu_config)
device._screenshot = nemu_impl
device._touch = nemu_impl
# 组装命令部分 (Adb)
adb_config = AdbImplConfig(addr=f'{self.adb_ip}:{self.adb_port}', timeout=host_config.timeout)
# adb_impl = AdbImpl(device, adb_config)
_d = _create_adb_device_base(adb_config, AdbImpl)
device.commands = _d.commands
return device
# 为 ADB 相关的实现创建配置
if impl in ['adb', 'adb_raw', 'uiautomator2']:
config = AdbImplConfig(
@ -96,10 +117,12 @@ class Mumu12Host(HostProtocol):
@staticmethod
@lru_cache(maxsize=1)
def _read_install_path() -> str | None:
"""
Reads the installation path (DisplayIcon) of MuMu Player 12 from the registry.
r"""
从注册表中读取 MuMu Player 12 的安装路径
:return: The path to the display icon if found, otherwise None.
返回的路径为 shell 文件夹 `F:\Apps\Netease\MuMuPlayer-12.0\shell`
:return: 若找到则返回安装路径否则返回 None
"""
if os.name != 'nt':
return None

View File

@ -3,4 +3,5 @@ from . import adb # noqa: F401
from . import adb_raw # noqa: F401
from . import remote_windows # noqa: F401
from . import uiautomator2 # noqa: F401
from . import windows # noqa: F401
from . import windows # noqa: F401
from . import nemu_ipc # noqa: F401

View File

@ -0,0 +1,8 @@
from .external_renderer_ipc import ExternalRendererIpc
from .nemu_ipc import NemuIpcImpl, NemuIpcImplConfig
__all__ = [
"ExternalRendererIpc",
"NemuIpcImpl",
"NemuIpcImplConfig",
]

View File

@ -0,0 +1,278 @@
import ctypes
import logging
import os
logger = logging.getLogger(__name__)
class NemuIpcIncompatible(RuntimeError):
"""MuMu12 IPC 环境不兼容或 DLL 加载失败"""
class ExternalRendererIpc:
"""对 `external_renderer_ipc.dll` 的轻量封装。
该类仅处理 DLL 加载与原型声明并提供带有类型提示的薄包装方法
方便在其他模块中调用且保持类型安全
"""
def __init__(self, mumu_shell_folder: str):
if os.name != "nt":
raise NemuIpcIncompatible("ExternalRendererIpc only supports Windows.")
self.lib = self.__load_dll(mumu_shell_folder)
self.raise_on_error: bool = True
"""是否在调用 DLL 函数失败时抛出异常。"""
self.__declare_prototypes()
def connect(self, nemu_folder: str, instance_id: int) -> int:
"""
建立连接
API 原型
`int nemu_connect(const wchar_t* path, int index)`
:param nemu_folder: 模拟器安装路径
:param instance_id: 模拟器实例 ID
:return: 成功返回连接 ID失败返回 0
"""
return self.lib.nemu_connect(nemu_folder, instance_id)
def disconnect(self, connect_id: int) -> None:
"""
断开连接
API 原型
`void nemu_disconnect(int handle)`
:param connect_id: 连接 ID
:return: 无返回值
"""
return self.lib.nemu_disconnect(connect_id)
def get_display_id(self, connect_id: int, pkg: str, app_index: int) -> int:
"""
获取指定包的 display id
API 原型
`int nemu_get_display_id(int handle, const char* pkg, int appIndex)`
:param connect_id: 连接 ID
:param pkg: 包名
:param app_index: 多开应用索引
:return: <0 表示失败>=0 表示有效 display id
"""
return self.lib.nemu_get_display_id(connect_id, pkg.encode('utf-8'), app_index)
def capture_display(
self,
connect_id: int,
display_id: int,
buf_len: int,
width_ptr: ctypes.c_void_p,
height_ptr: ctypes.c_void_p,
buffer_ptr: ctypes.c_void_p,
) -> int:
"""
截取指定显示屏内容
API 原型
`int nemu_capture_display(int handle, unsigned int displayid, int buffer_size, int *width, int *height, unsigned char* pixels)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param buf_len: 缓冲区长度字节
:param width_ptr: 用于接收宽度的指针ctypes.c_void_p/int 指针
:param height_ptr: 用于接收高度的指针ctypes.c_void_p/int 指针
:param buffer_ptr: 用于接收像素数据的指针ctypes.c_void_p/unsigned char* 指针
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_capture_display(
connect_id,
display_id,
buf_len,
width_ptr,
height_ptr,
buffer_ptr,
)
def input_text(self, connect_id: int, text: str) -> int:
"""
输入文本
API 原型
`int nemu_input_text(int handle, int size, const char* buf)`
:param connect_id: 连接 ID
:param text: 输入文本utf-8
:return: 0 表示成功>0 表示失败
"""
buf = text.encode('utf-8')
return self.lib.nemu_input_text(connect_id, len(buf), buf)
def input_touch_down(self, connect_id: int, display_id: int, x: int, y: int) -> int:
"""
发送触摸按下事件
API 原型
`int nemu_input_event_touch_down(int handle, int displayid, int x_point, int y_point)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param x: 触摸点 X 坐标
:param y: 触摸点 Y 坐标
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_touch_down(connect_id, display_id, x, y)
def input_touch_up(self, connect_id: int, display_id: int) -> int:
"""
发送触摸抬起事件
API 原型
`int nemu_input_event_touch_up(int handle, int displayid)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_touch_up(connect_id, display_id)
def input_key_down(self, connect_id: int, display_id: int, key_code: int) -> int:
"""
发送按键按下事件
API 原型
`int nemu_input_event_key_down(int handle, int displayid, int key_code)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param key_code: 按键码
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_key_down(connect_id, display_id, key_code)
def input_key_up(self, connect_id: int, display_id: int, key_code: int) -> int:
"""
发送按键抬起事件
API 原型
`int nemu_input_event_key_up(int handle, int displayid, int key_code)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param key_code: 按键码
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_key_up(connect_id, display_id, key_code)
def input_finger_touch_down(self, connect_id: int, display_id: int, finger_id: int, x: int, y: int) -> int:
"""
多指触摸按下
API 原型
`int nemu_input_event_finger_touch_down(int handle, int displayid, int finger_id, int x_point, int y_point)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param finger_id: 手指编号1-10
:param x: 触摸点 X 坐标
:param y: 触摸点 Y 坐标
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_finger_touch_down(connect_id, display_id, finger_id, x, y)
def input_finger_touch_up(self, connect_id: int, display_id: int, finger_id: int) -> int:
"""
多指触摸抬起
API 原型
`int nemu_input_event_finger_touch_up(int handle, int displayid, int slot_id)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param finger_id: 手指编号1-10
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_finger_touch_up(connect_id, display_id, finger_id)
# ------------------------------------------------------------------
# 内部工具
# ------------------------------------------------------------------
def __load_dll(self, mumu_shell_folder: str) -> ctypes.CDLL:
"""尝试多条路径加载 DLL。"""
candidate_paths = [
os.path.join(mumu_shell_folder, "sdk", "external_renderer_ipc.dll"),
os.path.join(
mumu_shell_folder,
"nx_device",
"12.0",
"sdk",
"external_renderer_ipc.dll",
),
]
for p in candidate_paths:
if not os.path.exists(p):
continue
try:
return ctypes.CDLL(p)
except OSError as e: # pragma: no cover
logger.warning("Failed to load DLL (%s): %s", p, e)
raise NemuIpcIncompatible("external_renderer_ipc.dll not found or failed to load.")
def __declare_prototypes(self) -> None:
"""声明 DLL 函数原型,确保 ctypes 类型安全。"""
# 连接 / 断开
self.lib.nemu_connect.argtypes = [ctypes.c_wchar_p, ctypes.c_int]
self.lib.nemu_connect.restype = ctypes.c_int
self.lib.nemu_disconnect.argtypes = [ctypes.c_int]
self.lib.nemu_disconnect.restype = None
# 获取 display id
self.lib.nemu_get_display_id.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_int]
self.lib.nemu_get_display_id.restype = ctypes.c_int
# 截图
self.lib.nemu_capture_display.argtypes = [
ctypes.c_int,
ctypes.c_uint,
ctypes.c_int,
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p,
]
self.lib.nemu_capture_display.restype = ctypes.c_int
# 输入文本
self.lib.nemu_input_text.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_char_p]
self.lib.nemu_input_text.restype = ctypes.c_int
# 触摸
self.lib.nemu_input_event_touch_down.argtypes = [
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
]
self.lib.nemu_input_event_touch_down.restype = ctypes.c_int
self.lib.nemu_input_event_touch_up.argtypes = [ctypes.c_int, ctypes.c_int]
self.lib.nemu_input_event_touch_up.restype = ctypes.c_int
# 按键
self.lib.nemu_input_event_key_down.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int]
self.lib.nemu_input_event_key_down.restype = ctypes.c_int
self.lib.nemu_input_event_key_up.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int]
self.lib.nemu_input_event_key_up.restype = ctypes.c_int
# 多指触摸
self.lib.nemu_input_event_finger_touch_down.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.c_int]
self.lib.nemu_input_event_finger_touch_down.restype = ctypes.c_int
self.lib.nemu_input_event_finger_touch_up.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int]
self.lib.nemu_input_event_finger_touch_up.restype = ctypes.c_int
logger.debug("DLL function prototypes declared")

View File

@ -0,0 +1,237 @@
import os
import ctypes
import logging
from dataclasses import dataclass
from time import sleep
from typing_extensions import override
import cv2
import numpy as np
from cv2.typing import MatLike
from ...device import AndroidDevice, Device
from ...protocol import Touchable, Screenshotable
from ...registration import register_impl, ImplConfig
from .external_renderer_ipc import ExternalRendererIpc
logger = logging.getLogger(__name__)
class NemuIpcIncompatible(Exception):
"""MuMu12 版本过低或 dll 不兼容"""
pass
class NemuIpcError(Exception):
"""调用 IPC 过程中发生错误"""
pass
@dataclass
class NemuIpcImplConfig(ImplConfig):
"""nemu_ipc 设备实现的配置模型"""
mumu_shell_folder: str
instance_id: int
class NemuIpcImpl(Touchable, Screenshotable):
"""
利用 MuMu12 提供的 external_renderer_ipc.dll 进行截图与触摸控制
"""
def __init__(self, device: Device, config: NemuIpcImplConfig):
self.device = device
self.config = config
self.__width: int = 0
self.__height: int = 0
self.__connected: bool = False
self._connect_id: int = 0
self.display_id: int = 0
"""
显示器 ID`0` 表示主显示器
如果没有启用后台保活功能一般为主显示器
"""
self.nemu_folder = os.path.abspath(os.path.join(config.mumu_shell_folder, os.pardir))
# --------------------------- DLL 封装 ---------------------------
self._ipc = ExternalRendererIpc(config.mumu_shell_folder)
logger.info("ExternalRendererIpc initialized and DLL loaded")
@property
def width(self) -> int:
"""
屏幕宽度
若为 0表示未连接或未获取到分辨率
"""
return self.__width
@property
def height(self) -> int:
"""
屏幕高度
若为 0表示未连接或未获取到分辨率
"""
return self.__height
@property
def connected(self) -> bool:
"""是否已连接。"""
return self.__connected
# ------------------------------------------------------------------
# 基础控制
# ------------------------------------------------------------------
def _ensure_connected(self) -> None:
if not self.__connected:
self.connect()
def connect(self) -> None:
"""连接模拟器。"""
if self.__connected:
return
connect_id = self._ipc.connect(self.nemu_folder, self.config.instance_id)
if connect_id == 0:
raise NemuIpcError("nemu_connect failed, please check if the emulator is running and the instance ID is correct.")
self._connect_id = connect_id
self.__connected = True
logger.debug("NemuIpc connected, connect_id=%d", connect_id)
def disconnect(self) -> None:
"""断开连接。"""
if not self.__connected:
return
self._ipc.disconnect(self._connect_id)
self.__connected = False
self._connect_id = 0
logger.debug("NemuIpc disconnected.")
# ------------------------------------------------------------------
# Screenshotable 接口实现
# ------------------------------------------------------------------
@property
def screen_size(self) -> tuple[int, int]:
"""获取屏幕分辨率。"""
if self.__width == 0 or self.__height == 0:
self._query_resolution()
if self.__width == 0 or self.__height == 0:
raise NemuIpcError("Screen resolution not obtained, please connect to the emulator first.")
return self.__width, self.__height
@override
def detect_orientation(self):
if self.__width > self.__height:
return "landscape"
if self.__height > self.__width:
return "portrait"
return None
@override
def screenshot(self) -> MatLike:
self._ensure_connected()
# 确保分辨率已知
_ = self.screen_size
length = self.__width * self.__height * 4 # RGBA
buf_type = ctypes.c_ubyte * length
buffer = buf_type()
w_ptr = ctypes.pointer(ctypes.c_int(self.__width))
h_ptr = ctypes.pointer(ctypes.c_int(self.__height))
ret = self._ipc.capture_display(
self._connect_id,
self.display_id,
length,
ctypes.cast(w_ptr, ctypes.c_void_p),
ctypes.cast(h_ptr, ctypes.c_void_p),
ctypes.cast(buffer, ctypes.c_void_p),
)
if ret != 0:
raise NemuIpcError(f"nemu_capture_display screenshot failed, error code={ret}")
# 读入并转换数据
img = np.ctypeslib.as_array(buffer).reshape((self.__height, self.__width, 4))
# RGBA -> BGR
img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR)
cv2.flip(img, 0, dst=img)
return img
# --------------------------- 内部工具 -----------------------------
def _query_resolution(self) -> None:
"""调用 capture 接口并返回宽高信息,不取像素数据。"""
self._ensure_connected()
w_ptr = ctypes.pointer(ctypes.c_int(0))
h_ptr = ctypes.pointer(ctypes.c_int(0))
ret = self._ipc.capture_display(
self._connect_id,
self.display_id,
0,
ctypes.cast(w_ptr, ctypes.c_void_p),
ctypes.cast(h_ptr, ctypes.c_void_p),
ctypes.c_void_p(),
)
if ret != 0:
raise NemuIpcError(f"nemu_capture_display 查询分辨率失败,错误码={ret}")
self.__width = w_ptr.contents.value
self.__height = h_ptr.contents.value
logger.debug("Parsed resolution %dx%d", self.__width, self.__height)
# ------------------------------------------------------------------
# Touchable 接口实现
# ------------------------------------------------------------------
@override
def click(self, x: int, y: int) -> None:
self._ensure_connected()
self._ipc.input_touch_down(self._connect_id, self.display_id, x, y)
sleep(0.01)
self._ipc.input_touch_up(self._connect_id, self.display_id)
@override
def swipe(
self,
x1: int,
y1: int,
x2: int,
y2: int,
duration: float | None = None,
) -> None:
self._ensure_connected()
duration = duration or 0.3
steps = max(int(duration / 0.01), 2)
xs = np.linspace(x1, x2, steps, dtype=int)
ys = np.linspace(y1, y2, steps, dtype=int)
# 按下第一点
self._ipc.input_touch_down(self._connect_id, self.display_id, xs[0], ys[0])
sleep(0.01)
# 中间移动
for px, py in zip(xs[1:-1], ys[1:-1]):
self._ipc.input_touch_down(self._connect_id, self.display_id, px, py)
sleep(0.01)
# 最终抬起
self._ipc.input_touch_up(self._connect_id, self.display_id)
sleep(0.01)
# ------------------------------------------------------------------
# 工厂方法
# ------------------------------------------------------------------
@register_impl("nemu_ipc", config_model=NemuIpcImplConfig)
def create_nemu_ipc_device(config: NemuIpcImplConfig):
"""创建一个 AndroidDevice并挂载 NemuIpcImpl。"""
device = AndroidDevice()
impl = NemuIpcImpl(device, config)
device._touch = impl
device._screenshot = impl
return device

View File

@ -7,9 +7,10 @@ if TYPE_CHECKING:
from .implements.adb import AdbImplConfig
from .implements.remote_windows import RemoteWindowsImplConfig
from .implements.windows import WindowsImplConfig
from .implements.nemu_ipc import NemuIpcImplConfig
AdbBasedImpl = Literal['adb', 'adb_raw', 'uiautomator2']
DeviceImpl = str | AdbBasedImpl | Literal['windows', 'remote_windows']
DeviceImpl = str | AdbBasedImpl | Literal['windows', 'remote_windows', 'nemu_ipc']
# --- 核心类型定义 ---
@ -42,8 +43,8 @@ def register_impl(name: str, config_model: Type[ImplConfig] | None = None) -> Ca
:param config_model: (可选) 与该实现关联的 dataclass 配置模型
"""
def decorator(creator_func: Callable[..., Device]) -> Callable[..., Device]:
if name in DEVICE_CREATORS:
raise ImplRegistrationError(f"实现 '{name}' 已被注册。")
# if name in DEVICE_CREATORS:
# raise ImplRegistrationError(f"实现 '{name}' 已被注册。")
DEVICE_CREATORS[name] = (creator_func, config_model)
return creator_func
return decorator
@ -61,6 +62,10 @@ def create_device(impl_name: Literal['remote_windows'], config: 'RemoteWindowsIm
@overload
def create_device(impl_name: AdbBasedImpl, config: 'AdbImplConfig') -> Device: ...
# 新增 nemu_ipc overload
@overload
def create_device(impl_name: Literal['nemu_ipc'], config: 'NemuIpcImplConfig') -> Device: ...
# 函数的实际实现
def create_device(impl_name: DeviceImpl, config: ImplConfig | None = None) -> Device:
"""

View File

@ -555,7 +555,7 @@ class KotoneBotUI:
# choices = ['windows', 'remote_windows']
# else: # Mumu, Leidian, Custom
# choices = ['adb', 'adb_raw', 'uiautomator2']
choices = ['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows']
choices = ['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows', 'nemu_ipc']
if impl_value not in choices:
new_value = choices[0]
else:
@ -661,7 +661,7 @@ class KotoneBotUI:
# choices = ['adb', 'adb_raw', 'uiautomator2']
# else:
# raise ValueError(f'Unsupported backend type: {type_in_config}')
choices = ['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows']
choices = ['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows', 'nemu_ipc']
screenshot_impl = gr.Dropdown(
choices=choices,
value=self.current_config.backend.screenshot_impl,

View File

@ -9,6 +9,8 @@ from typing_extensions import override
import cv2
from kotonebot.client.implements.nemu_ipc import NemuIpcImplConfig
from ...client import Device
from kotonebot.ui import user
from kotonebot import KotoneBot
@ -228,10 +230,9 @@ class Kaa(KotoneBot):
else:
raise ValueError(f"Impl of '{impl_name}' is not supported on DMM.")
return self.backend_instance.create_device(impl_name, host_conf)
# 统一处理所有基于 ADB 的后端
elif isinstance(self.backend_instance, (CustomInstance, Mumu12Instance, LeidianInstance)):
if impl_name in ['adb', 'adb_raw', 'uiautomator2']:
if impl_name in ['adb', 'adb_raw', 'uiautomator2'] or (impl_name == 'nemu_ipc' and isinstance(self.backend_instance, Mumu12Instance)):
host_conf = AdbHostConfig(timeout=180)
return self.backend_instance.create_device(impl_name, host_conf)
else: