Merge branch 'dev'

This commit is contained in:
XcantloadX 2025-06-28 14:41:29 +08:00
commit 9935087753
29 changed files with 1303 additions and 337 deletions

8
.vscode/launch.json vendored
View File

@ -24,6 +24,14 @@
],
// "module": "${command:extension.commandvariable.file.relativeDirDots}.${fileBasenameNoExtension}",
},
{
"name": "Python: Current Module(Without context)",
"type": "python",
"request": "launch",
"console": "integratedTerminal",
"module": "${command:extension.commandvariable.file.relativeDirDots}.${fileBasenameNoExtension}",
},
{
"name": "KotonebotDebug: Current Module",

View File

@ -76,6 +76,9 @@ kaa 的开发主要用到了以下开源项目:
](https://github.com/AllenHeartcore/GkmasObjectManager):用于提取游戏图像资源,以 GPLv3 协议开源。
* [gakumasu-diff](https://github.com/vertesan/gakumasu-diff):游戏数据。
kaa 的开发还参考了以下开源项目:
* [EmulatorExtras](https://github.com/MaaXYZ/EmulatorExtras)MuMu 与雷电模拟器的截图与控制接口定义。
* [blue_archive_auto_script](https://github.com/pur1fying/blue_archive_auto_script)MuMu 与雷电模拟器的截图与控制接口的 Python 实现,以及各模拟器的控制实现。
## 免责声明
**请在使用本项目前仔细阅读以下内容。使用本脚本将带来包括但不限于账号被封禁的风险。**

View File

@ -46,12 +46,10 @@ from kotonebot.backend.color import (
from kotonebot.backend.ocr import (
Ocr, OcrResult, OcrResultList, jp, en, StringMatchFunction
)
from kotonebot.client.registration import AdbBasedImpl, create_device
from kotonebot.config.manager import load_config, save_config
from kotonebot.config.base_config import UserConfig
from kotonebot.backend.core import Image, HintBox
from kotonebot.errors import KotonebotWarning
from kotonebot.client import DeviceImpl
from kotonebot.backend.preprocessor import PreprocessorProtocol
from kotonebot.primitives import Rect

View File

@ -1,11 +1,10 @@
from .device import Device
from .registration import create_device, DeviceImpl
from .registration import DeviceImpl
# 确保所有实现都被注册
from . import implements # noqa: F401
__all__ = [
'Device',
'create_device',
'DeviceImpl',
]

View File

@ -9,6 +9,7 @@ from cv2.typing import MatLike
from adbutils._device import AdbDevice as AdbUtilsDevice
from ..backend.debug import result
from ..errors import UnscalableResolutionError
from kotonebot.backend.core import HintBox
from kotonebot.primitives import Rect, Point, is_point
from .protocol import ClickableObjectProtocol, Commandable, Touchable, Screenshotable, AndroidCommandable, WindowsCommandable
@ -78,6 +79,31 @@ class Device:
"""
设备平台名称
"""
self.target_resolution: tuple[int, int] | None = None
"""
目标分辨率
若设置则在截图点击滑动等时会缩放到目标分辨率
仅支持等比例缩放若无法等比例缩放则会抛出异常 `UnscalableResolutionError`
"""
self.match_rotation: bool = True
"""
分辨率缩放是否自动匹配旋转
当目标与真实分辨率的宽高比不一致时是否允许通过旋转交换宽高后再进行匹配
True 则忽略方向差异只要宽高比一致就视为可缩放False 则必须匹配旋转
例如当目标分辨率为 1920x1080而真实分辨率为 1080x1920
``match_rotation`` True 则认为可以缩放 False 则会抛出异常
"""
self.aspect_ratio_tolerance: float = 0.1
"""
宽高比容差阈值
判断两分辨率宽高比差异是否接受的阈值
该值越小对比例一致性的要求越严格
默认为 0.1 10% 容差
"""
@property
def adb(self) -> AdbUtilsDevice:
@ -89,6 +115,50 @@ class Device:
def adb(self, value: AdbUtilsDevice) -> None:
self._adb = value
def _scale_pos_real_to_target(self, real_x: int, real_y: int) -> tuple[int, int]:
"""将真实屏幕坐标缩放到目标逻辑坐标"""
if self.target_resolution is None:
return real_x, real_y
real_w, real_h = self.screen_size
target_w, target_h = self.target_resolution
# 校验分辨率是否可缩放并获取调整后的目标分辨率
adjusted_target_w, adjusted_target_h = self.__assert_scalable((real_w, real_h), (target_w, target_h))
scale_w = adjusted_target_w / real_w
scale_h = adjusted_target_h / real_h
return int(real_x * scale_w), int(real_y * scale_h)
def _scale_pos_target_to_real(self, target_x: int, target_y: int) -> tuple[int, int]:
"""将目标逻辑坐标缩放到真实屏幕坐标"""
if self.target_resolution is None:
return target_x, target_y # 输入坐标已是真实坐标
real_w, real_h = self.screen_size
target_w, target_h = self.target_resolution
# 校验分辨率是否可缩放并获取调整后的目标分辨率
adjusted_target_w, adjusted_target_h = self.__assert_scalable((real_w, real_h), (target_w, target_h))
scale_to_real_w = real_w / adjusted_target_w
scale_to_real_h = real_h / adjusted_target_h
return int(target_x * scale_to_real_w), int(target_y * scale_to_real_h)
def __scale_image (self, img: MatLike) -> MatLike:
if self.target_resolution is None:
return img
target_w, target_h = self.target_resolution
h, w = img.shape[:2]
# 校验分辨率是否可缩放并获取调整后的目标分辨率
adjusted_target = self.__assert_scalable((w, h), (target_w, target_h))
return cv2.resize(img, adjusted_target)
@overload
def click(self) -> None:
"""
@ -161,7 +231,12 @@ class Device:
logger.debug(f"Executing click hook before: ({x}, {y})")
x, y = hook(x, y)
logger.debug(f"Click hook before result: ({x}, {y})")
logger.debug(f"Click: {x}, {y}")
if self.target_resolution is not None:
# 输入坐标为逻辑坐标,需要转换为真实坐标
real_x, real_y = self._scale_pos_target_to_real(x, y)
else:
real_x, real_y = x, y
logger.debug(f"Click: {x}, {y}%s", f"(Physical: {real_x}, {real_y})" if self.target_resolution is not None else "")
from ..backend.context import ContextStackVars
if ContextStackVars.current() is not None:
image = ContextStackVars.ensure_current()._screenshot
@ -169,9 +244,11 @@ class Device:
image = np.array([])
if image is not None and image.size > 0:
cv2.circle(image, (x, y), 10, (0, 0, 255), -1)
message = f"point: ({x}, {y})"
message = f"Point: ({x}, {y})"
if self.target_resolution is not None:
message += f" physical: ({real_x}, {real_y})"
result("device.click", image, message)
self._touch.click(x, y)
self._touch.click(real_x, real_y)
def __click_point_tuple(self, point: Point) -> None:
self.click(point[0], point[1])
@ -232,6 +309,10 @@ class Device:
"""
滑动屏幕
"""
if self.target_resolution is not None:
# 输入坐标为逻辑坐标,需要转换为真实坐标
x1, y1 = self._scale_pos_target_to_real(x1, y1)
x2, y2 = self._scale_pos_target_to_real(x2, y2)
self._touch.swipe(x1, y1, x2, y2, duration)
def swipe_scaled(self, x1: float, y1: float, x2: float, y2: float, duration: float|None = None) -> None:
@ -260,6 +341,7 @@ class Device:
img = self.screenshot_raw()
if self.screenshot_hook_after is not None:
img = self.screenshot_hook_after(img)
img = self.__scale_image(img)
return img
def screenshot_raw(self) -> MatLike:
@ -296,8 +378,15 @@ class Device:
`self.orientation` 属性默认为竖屏如果需要自动检测
调用 `self.detect_orientation()` 方法
如果已知方向也可以直接设置 `self.orientation` 属性
即使设置了 `self.target_resolution`返回的分辨率仍然是真实分辨率
"""
return self._screenshot.screen_size
size = self._screenshot.screen_size
if self.orientation == 'landscape':
size = sorted(size, reverse=True)
else:
size = sorted(size, reverse=False)
return size[0], size[1]
def detect_orientation(self) -> Literal['portrait', 'landscape'] | None:
"""
@ -307,6 +396,68 @@ class Device:
"""
return self._screenshot.detect_orientation()
def __aspect_ratio_compatible(self, src_size: tuple[int, int], tgt_size: tuple[int, int]) -> bool:
"""
判断两个尺寸在宽高比意义上是否兼容
``self.match_rotation`` True忽略方向长边/短边进行比较
判断标准由 ``self.aspect_ratio_tolerance`` 决定默认 0.1
"""
src_w, src_h = src_size
tgt_w, tgt_h = tgt_size
# 尺寸必须为正
if src_w <= 0 or src_h <= 0:
raise ValueError(f"Source size dimensions must be positive for scaling: {src_size}")
if tgt_w <= 0 or tgt_h <= 0:
raise ValueError(f"Target size dimensions must be positive for scaling: {tgt_size}")
tolerant = self.aspect_ratio_tolerance
# 直接比较宽高比
if abs((tgt_w / src_w) - (tgt_h / src_h)) <= tolerant:
return True
# 尝试忽略方向差异
if self.match_rotation:
ratio_src = max(src_w, src_h) / min(src_w, src_h)
ratio_tgt = max(tgt_w, tgt_h) / min(tgt_w, tgt_h)
return abs(ratio_src - ratio_tgt) <= tolerant
return False
def __assert_scalable(self, source: tuple[int, int], target: tuple[int, int]) -> tuple[int, int]:
"""
校验分辨率是否可缩放并返回调整后的目标分辨率
match_rotation True 且源分辨率与目标分辨率的旋转方向不一致时
自动交换目标分辨率的宽高使其与源分辨率的方向保持一致
:param src_size: 源分辨率 (width, height)
:param tgt_size: 目标分辨率 (width, height)
:return: 调整后的目标分辨率 (width, height)
:raises UnscalableResolutionError: 若宽高比不兼容
"""
# 智能调整目标分辨率方向
adjusted_tgt_size = target
if self.match_rotation:
src_w, src_h = source
tgt_w, tgt_h = target
# 判断源分辨率和目标分辨率的方向
src_is_landscape = src_w > src_h
tgt_is_landscape = tgt_w > tgt_h
# 如果方向不一致,交换目标分辨率的宽高
if src_is_landscape != tgt_is_landscape:
adjusted_tgt_size = (tgt_h, tgt_w)
# 校验调整后的分辨率是否兼容
if not self.__aspect_ratio_compatible(source, adjusted_tgt_size):
raise UnscalableResolutionError(target, source)
return adjusted_tgt_size
class AndroidDevice(Device):
def __init__(self, adb_connection: AdbUtilsDevice | None = None) -> None:

View File

@ -0,0 +1,94 @@
from abc import ABC
from typing import Any, Literal, TypeGuard, TypeVar, get_args
from typing_extensions import assert_never
from adbutils import adb
from adbutils._device import AdbDevice
from kotonebot import logging
from kotonebot.client.device import AndroidDevice
from .protocol import Instance, AdbHostConfig, Device
logger = logging.getLogger(__name__)
AdbRecipes = Literal['adb', 'adb_raw', 'uiautomator2']
def is_adb_recipe(recipe: Any) -> TypeGuard[AdbRecipes]:
return recipe in get_args(AdbRecipes)
def connect_adb(
ip: str,
port: int,
connect: bool = True,
disconnect: bool = True,
timeout: float = 180,
device_serial: str | None = None
) -> AdbDevice:
"""
创建 ADB 连接
"""
if disconnect:
logger.debug('adb disconnect %s:%d', ip, port)
adb.disconnect(f'{ip}:{port}')
if connect:
logger.debug('adb connect %s:%d', ip, port)
result = adb.connect(f'{ip}:{port}')
if 'cannot connect to' in result:
raise ValueError(result)
serial = device_serial or f'{ip}:{port}'
logger.debug('adb wait for %s', serial)
adb.wait_for(serial, timeout=timeout)
devices = adb.device_list()
logger.debug('adb device_list: %s', devices)
d = [d for d in devices if d.serial == serial]
if len(d) == 0:
raise ValueError(f"Device {serial} not found")
d = d[0]
return d
class CommonAdbCreateDeviceMixin(ABC):
"""
通用 ADB 创建设备的 Mixin
Mixin 定义了创建 ADB 设备的通用接口
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# 下面的属性只是为了让类型检查通过,无实际实现
self.adb_ip: str
self.adb_port: int
self.adb_name: str
def create_device(self, recipe: AdbRecipes, config: AdbHostConfig) -> Device:
"""
创建 ADB 设备
"""
connection = connect_adb(
self.adb_ip,
self.adb_port,
connect=True,
disconnect=True,
timeout=config.timeout,
device_serial=self.adb_name
)
d = AndroidDevice(connection)
match recipe:
case 'adb':
from kotonebot.client.implements.adb import AdbImpl
impl = AdbImpl(connection)
d._screenshot = impl
d._touch = impl
d.commands = impl
case 'adb_raw':
from kotonebot.client.implements.adb_raw import AdbRawImpl
impl = AdbRawImpl(connection)
d._screenshot = impl
d._touch = impl
d.commands = impl
case 'uiautomator2':
from kotonebot.client.implements.uiautomator2 import UiAutomator2Impl
from kotonebot.client.implements.adb import AdbImpl
impl = UiAutomator2Impl(connection)
d._screenshot = impl
d._touch = impl
d.commands = AdbImpl(connection)
case _:
assert_never(f'Unsupported ADB recipe: {recipe}')
return d

View File

@ -1,22 +1,21 @@
import os
import subprocess
from psutil import process_iter
from .protocol import Instance, AdbHostConfig
from typing import ParamSpec, TypeVar, cast
from .protocol import Instance, AdbHostConfig, HostProtocol
from typing import ParamSpec, TypeVar
from typing_extensions import override
from kotonebot import logging
from kotonebot.client import DeviceImpl
from kotonebot.client.device import Device
from kotonebot.client.registration import AdbBasedImpl, create_device
from kotonebot.client.implements.adb import AdbImplConfig
from kotonebot.client import Device
from .adb_common import AdbRecipes, CommonAdbCreateDeviceMixin
logger = logging.getLogger(__name__)
CustomRecipes = AdbRecipes
P = ParamSpec('P')
T = TypeVar('T')
class CustomInstance(Instance[AdbHostConfig]):
class CustomInstance(CommonAdbCreateDeviceMixin, Instance[AdbHostConfig]):
def __init__(self, exe_path: str | None, emulator_args: str = "", *args, **kwargs):
super().__init__(*args, **kwargs)
self.exe_path: str | None = exe_path
@ -69,24 +68,12 @@ class CustomInstance(Instance[AdbHostConfig]):
pass
@override
def create_device(self, impl: DeviceImpl, host_config: AdbHostConfig) -> Device:
def create_device(self, impl: CustomRecipes, host_config: AdbHostConfig) -> Device:
"""为自定义实例创建 Device。"""
if self.adb_port is None:
raise ValueError("ADB port is not set and is required.")
# 为 ADB 相关的实现创建配置
if impl in ['adb', 'adb_raw', 'uiautomator2']:
config = AdbImplConfig(
addr=f'{self.adb_ip}:{self.adb_port}',
connect=True,
disconnect=True,
device_serial=self.adb_name,
timeout=host_config.timeout
)
impl = cast(AdbBasedImpl, impl) # make pylance happy
return create_device(impl, config)
else:
raise ValueError(f'Unsupported device implementation for Custom: {impl}')
return super().create_device(impl, host_config)
def __repr__(self) -> str:
return f'CustomInstance(#{self.id}# at "{self.exe_path}" with {self.adb_ip}:{self.adb_port})'
@ -99,6 +86,25 @@ def _type_check(ins: Instance) -> CustomInstance:
def create(exe_path: str | None, adb_ip: str, adb_port: int, adb_name: str | None, emulator_args: str = "") -> CustomInstance:
return CustomInstance(exe_path, emulator_args=emulator_args, id='custom', name='Custom', adb_ip=adb_ip, adb_port=adb_port, adb_name=adb_name)
class CustomHost(HostProtocol[CustomRecipes]):
@staticmethod
def installed() -> bool:
# Custom instances don't have a specific installation requirement
return True
@staticmethod
def list() -> list[Instance]:
# Custom instances are created manually, not discovered
return []
@staticmethod
def query(*, id: str) -> Instance | None:
# Custom instances are created manually, not discovered
return None
@staticmethod
def recipes() -> 'list[CustomRecipes]':
return ['adb', 'adb_raw', 'uiautomator2']
if __name__ == '__main__':
ins = create(r'C:\Program Files\BlueStacks_nxt\HD-Player.exe', '127.0.0.1', 5555, '**emulator-name**')

View File

@ -1,18 +1,17 @@
import os
import subprocess
from typing import cast
from typing import Literal
from functools import lru_cache
from typing_extensions import override
from kotonebot import logging
from kotonebot.client import DeviceImpl
from kotonebot.client.device import Device
from kotonebot.client.registration import AdbBasedImpl, create_device
from kotonebot.client.implements.adb import AdbImplConfig
from kotonebot.client import Device
from kotonebot.util import Countdown, Interval
from .protocol import HostProtocol, Instance, copy_type, AdbHostConfig
from .adb_common import AdbRecipes, CommonAdbCreateDeviceMixin
logger = logging.getLogger(__name__)
LeidianRecipes = AdbRecipes
if os.name == 'nt':
from ...interop.win.reg import read_reg
@ -21,7 +20,7 @@ else:
"""Stub for read_reg on non-Windows platforms."""
return default
class LeidianInstance(Instance[AdbHostConfig]):
class LeidianInstance(CommonAdbCreateDeviceMixin, Instance[AdbHostConfig]):
@copy_type(Instance.__init__)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -64,35 +63,23 @@ class LeidianInstance(Instance[AdbHostConfig]):
it = Interval(5)
while not cd.expired() and not self.running():
it.wait()
self.refresh()
if not self.running():
raise TimeoutError(f'Leidian instance "{self.name}" is not available.')
@override
def running(self) -> bool:
result = LeidianHost._invoke_manager(['isrunning', '--index', str(self.index)])
return result.strip() == 'running'
return self.is_running
@override
def create_device(self, impl: DeviceImpl, host_config: AdbHostConfig) -> Device:
def create_device(self, impl: LeidianRecipes, host_config: AdbHostConfig) -> Device:
"""为雷电模拟器实例创建 Device。"""
if self.adb_port is None:
raise ValueError("ADB port is not set and is required.")
# 为 ADB 相关的实现创建配置
if impl in ['adb', 'adb_raw', 'uiautomator2']:
config = AdbImplConfig(
addr=f'{self.adb_ip}:{self.adb_port}',
connect=False, # 雷电模拟器不需要 adb connect
disconnect=False,
device_serial=self.adb_name,
timeout=host_config.timeout
)
impl = cast(AdbBasedImpl, impl) # make pylance happy
return create_device(impl, config)
else:
raise ValueError(f'Unsupported device implementation for Leidian: {impl}')
return super().create_device(impl, host_config)
class LeidianHost(HostProtocol):
class LeidianHost(HostProtocol[LeidianRecipes]):
@staticmethod
@lru_cache(maxsize=1)
def _read_install_path() -> str | None:
@ -197,6 +184,10 @@ class LeidianHost(HostProtocol):
return instance
return None
@staticmethod
def recipes() -> 'list[LeidianRecipes]':
return ['adb', 'adb_raw', 'uiautomator2']
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')
print(LeidianHost._read_install_path())

View File

@ -1,18 +1,19 @@
from dataclasses import dataclass
import os
import json
import subprocess
from functools import lru_cache
from typing import Any, cast
from typing import Any, Literal, overload
from typing_extensions import override
from kotonebot import logging
from kotonebot.client import DeviceImpl, Device
from kotonebot.client.registration import AdbBasedImpl, create_device
from kotonebot.client.implements.adb import AdbImplConfig
from kotonebot.client import Device
from kotonebot.client.device import AndroidDevice
from kotonebot.client.implements.adb import AdbImpl
from kotonebot.client.implements.nemu_ipc import NemuIpcImpl, NemuIpcImplConfig
from kotonebot.util import Countdown, Interval
from .protocol import HostProtocol, Instance, copy_type, AdbHostConfig
logger = logging.getLogger(__name__)
from .adb_common import AdbRecipes, CommonAdbCreateDeviceMixin, connect_adb, is_adb_recipe
if os.name == 'nt':
from ...interop.win.reg import read_reg
@ -21,7 +22,21 @@ else:
"""Stub for read_reg on non-Windows platforms."""
return default
class Mumu12Instance(Instance[AdbHostConfig]):
logger = logging.getLogger(__name__)
MuMu12Recipes = AdbRecipes | Literal['nemu_ipc']
@dataclass
class MuMu12HostConfig(AdbHostConfig):
"""nemu_ipc 能力的配置模型。"""
display_id: int | None = 0
"""目标显示器 ID默认为 0主显示器。若为 None 且设置了 target_package_name则自动获取对应的 display_id。"""
target_package_name: str | None = None
"""目标应用包名,用于自动获取 display_id。"""
app_index: int = 0
"""多开应用索引,传给 get_display_id 方法。"""
class Mumu12Instance(CommonAdbCreateDeviceMixin, Instance[MuMu12HostConfig]):
@copy_type(Instance.__init__)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -72,34 +87,58 @@ class Mumu12Instance(Instance[AdbHostConfig]):
def running(self) -> bool:
return self.is_android_started
@overload
def create_device(self, recipe: Literal['nemu_ipc'], host_config: MuMu12HostConfig) -> Device: ...
@overload
def create_device(self, recipe: AdbRecipes, host_config: AdbHostConfig) -> Device: ...
@override
def create_device(self, impl: DeviceImpl, host_config: AdbHostConfig) -> Device:
def create_device(self, recipe: MuMu12Recipes, host_config: MuMu12HostConfig | AdbHostConfig) -> Device:
"""为MuMu12模拟器实例创建 Device。"""
if self.adb_port is None:
raise ValueError("ADB port is not set and is required.")
# 为 ADB 相关的实现创建配置
if impl in ['adb', 'adb_raw', 'uiautomator2']:
config = AdbImplConfig(
addr=f'{self.adb_ip}:{self.adb_port}',
connect=True,
disconnect=True,
device_serial=self.adb_name,
timeout=host_config.timeout
if recipe == 'nemu_ipc' and isinstance(host_config, MuMu12HostConfig):
# NemuImpl
nemu_path = Mumu12Host._read_install_path()
if not nemu_path:
raise RuntimeError("无法找到 MuMu12 的安装路径。")
nemu_config = NemuIpcImplConfig(
nemu_folder=nemu_path,
instance_id=int(self.id),
display_id=host_config.display_id,
target_package_name=host_config.target_package_name,
app_index=host_config.app_index
)
impl = cast(AdbBasedImpl, impl) # make pylance happy
return create_device(impl, config)
else:
raise ValueError(f'Unsupported device implementation for MuMu12: {impl}')
nemu_impl = NemuIpcImpl(nemu_config)
# AdbImpl
adb_impl = AdbImpl(connect_adb(
self.adb_ip,
self.adb_port,
timeout=host_config.timeout,
device_serial=self.adb_name
))
device = AndroidDevice()
device._screenshot = nemu_impl
device._touch = nemu_impl
device.commands = adb_impl
class Mumu12Host(HostProtocol):
return device
elif isinstance(host_config, AdbHostConfig) and is_adb_recipe(recipe):
return super().create_device(recipe, host_config)
else:
raise ValueError(f'Unknown recipe: {recipe}')
class Mumu12Host(HostProtocol[MuMu12Recipes]):
@staticmethod
@lru_cache(maxsize=1)
def _read_install_path() -> str | None:
"""
Reads the installation path (DisplayIcon) of MuMu Player 12 from the registry.
r"""
从注册表中读取 MuMu Player 12 的安装路径
:return: The path to the display icon if found, otherwise None.
返回的路径为根目录 `F:\Apps\Netease\MuMuPlayer-12.0`
:return: 若找到则返回安装路径否则返回 None
"""
if os.name != 'nt':
return None
@ -116,6 +155,9 @@ class Mumu12Host(HostProtocol):
icon_path = icon_path.replace('"', '')
path = os.path.dirname(icon_path)
logger.debug('MuMu Player 12 installation path: %s', path)
# 返回根目录(去掉 shell 子目录)
if os.path.basename(path).lower() == 'shell':
path = os.path.dirname(path)
return path
return None
@ -130,7 +172,7 @@ class Mumu12Host(HostProtocol):
install_path = Mumu12Host._read_install_path()
if install_path is None:
raise RuntimeError('MuMu Player 12 is not installed.')
manager_path = os.path.join(install_path, 'MuMuManager.exe')
manager_path = os.path.join(install_path, 'shell', 'MuMuManager.exe')
logger.debug('MuMuManager execute: %s', repr(args))
output = subprocess.run(
[manager_path] + args,
@ -184,6 +226,10 @@ class Mumu12Host(HostProtocol):
if instance.id == id:
return instance
return None
@staticmethod
def recipes() -> 'list[MuMu12Recipes]':
return ['adb', 'adb_raw', 'uiautomator2', 'nemu_ipc']
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')

View File

@ -195,7 +195,8 @@ class Instance(Generic[T_HostConfig], ABC):
def __repr__(self) -> str:
return f'{self.__class__.__name__}(name="{self.name}", id="{self.id}", adb="{self.adb_ip}:{self.adb_port}"({self.adb_name}))'
class HostProtocol(Protocol):
Recipe = TypeVar('Recipe', bound=str)
class HostProtocol(Generic[Recipe], Protocol):
@staticmethod
def installed() -> bool: ...
@ -205,6 +206,8 @@ class HostProtocol(Protocol):
@staticmethod
def query(*, id: str) -> Instance | None: ...
@staticmethod
def recipes() -> 'list[Recipe]': ...
if __name__ == '__main__':
pass

View File

@ -0,0 +1,55 @@
from abc import ABC
from typing import Literal
from typing_extensions import assert_never
from kotonebot import logging
from kotonebot.client.device import WindowsDevice
from .protocol import Device, WindowsHostConfig, RemoteWindowsHostConfig
logger = logging.getLogger(__name__)
WindowsRecipes = Literal['windows', 'remote_windows']
# Windows 相关的配置类型联合
WindowsHostConfigs = WindowsHostConfig | RemoteWindowsHostConfig
class CommonWindowsCreateDeviceMixin(ABC):
"""
通用 Windows 创建设备的 Mixin
Mixin 定义了创建 Windows 设备的通用接口
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
def create_device(self, recipe: WindowsRecipes, config: WindowsHostConfigs) -> Device:
"""
创建 Windows 设备
"""
match recipe:
case 'windows':
if not isinstance(config, WindowsHostConfig):
raise ValueError(f"Expected WindowsHostConfig for 'windows' recipe, got {type(config)}")
from kotonebot.client.implements.windows import WindowsImpl
d = WindowsDevice()
impl = WindowsImpl(
device=d,
window_title=config.window_title,
ahk_exe_path=config.ahk_exe_path
)
d._screenshot = impl
d._touch = impl
return d
case 'remote_windows':
if not isinstance(config, RemoteWindowsHostConfig):
raise ValueError(f"Expected RemoteWindowsHostConfig for 'remote_windows' recipe, got {type(config)}")
from kotonebot.client.implements.remote_windows import RemoteWindowsImpl
d = WindowsDevice()
impl = RemoteWindowsImpl(
device=d,
host=config.host,
port=config.port
)
d._screenshot = impl
d._touch = impl
return d
case _:
assert_never(f'Unsupported Windows recipe: {recipe}')

View File

@ -3,4 +3,5 @@ from . import adb # noqa: F401
from . import adb_raw # noqa: F401
from . import remote_windows # noqa: F401
from . import uiautomator2 # noqa: F401
from . import windows # noqa: F401
from . import windows # noqa: F401
from . import nemu_ipc # noqa: F401

View File

@ -9,7 +9,7 @@ from adbutils._device import AdbDevice as AdbUtilsDevice
from ..device import AndroidDevice
from ..protocol import AndroidCommandable, Touchable, Screenshotable
from ..registration import register_impl, ImplConfig
from ..registration import ImplConfig
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@ -83,46 +83,3 @@ class AdbImpl(AndroidCommandable, Touchable, Screenshotable):
if duration is not None:
logger.warning("Swipe duration is not supported with AdbDevice. Ignoring duration.")
self.adb.shell(f"input touchscreen swipe {x1} {y1} {x2} {y2}")
def _create_adb_device_base(config: AdbImplConfig, impl_class: type) -> AndroidDevice:
"""
通用的 ADB 设备创建工厂函数
其他任意基于 ADB Impl 可以直接复用这个函数
:param config: ADB 实现配置
:param impl_class: 实现类或工厂函数构造函数接收 adb_connection 参数
"""
from adbutils import adb
if config.disconnect:
logger.debug('adb disconnect %s', config.addr)
adb.disconnect(config.addr)
if config.connect:
logger.debug('adb connect %s', config.addr)
result = adb.connect(config.addr)
if 'cannot connect to' in result:
raise ValueError(result)
serial = config.device_serial or config.addr
logger.debug('adb wait for %s', serial)
adb.wait_for(serial, timeout=config.timeout)
devices = adb.device_list()
logger.debug('adb device_list: %s', devices)
d = [d for d in devices if d.serial == serial]
if len(d) == 0:
raise ValueError(f"Device {config.addr} not found")
d = d[0]
device = AndroidDevice(d)
impl = impl_class(d)
device._touch = impl
device._screenshot = impl
device.commands = impl
return device
@register_impl('adb', config_model=AdbImplConfig)
def create_adb_device(config: AdbImplConfig) -> AndroidDevice:
"""AdbImpl 工厂函数"""
return _create_adb_device_base(config, AdbImpl)

View File

@ -11,9 +11,8 @@ import numpy as np
from cv2.typing import MatLike
from adbutils._utils import adb_path
from .adb import AdbImpl, AdbImplConfig, _create_adb_device_base
from .adb import AdbImpl
from adbutils._device import AdbDevice as AdbUtilsDevice
from ..registration import register_impl
from kotonebot import logging
logger = logging.getLogger(__name__)
@ -157,10 +156,4 @@ class AdbRawImpl(AdbImpl):
logger.verbose(f"adb raw screenshot wait time: {time.time() - start_time:.4f}s")
data = self.__data
self.__data = None
return data
@register_impl('adb_raw', config_model=AdbImplConfig)
def create_adb_raw_device(config: AdbImplConfig):
"""AdbRawImpl 工厂函数"""
return _create_adb_device_base(config, AdbRawImpl)
return data

View File

@ -0,0 +1,8 @@
from .external_renderer_ipc import ExternalRendererIpc
from .nemu_ipc import NemuIpcImpl, NemuIpcImplConfig
__all__ = [
"ExternalRendererIpc",
"NemuIpcImpl",
"NemuIpcImplConfig",
]

View File

@ -0,0 +1,280 @@
import ctypes
import logging
import os
logger = logging.getLogger(__name__)
class NemuIpcIncompatible(RuntimeError):
"""MuMu12 IPC 环境不兼容或 DLL 加载失败"""
class ExternalRendererIpc:
r"""对 `external_renderer_ipc.dll` 的轻量封装。
该类仅处理 DLL 加载与原型声明并提供带有类型提示的薄包装方法
方便在其他模块中调用且保持类型安全
传入参数为 MuMu 根目录 F:\Apps\Netease\MuMuPlayer-12.0
"""
def __init__(self, mumu_root_folder: str):
if os.name != "nt":
raise NemuIpcIncompatible("ExternalRendererIpc only supports Windows.")
self.lib = self.__load_dll(mumu_root_folder)
self.raise_on_error: bool = True
"""是否在调用 DLL 函数失败时抛出异常。"""
self.__declare_prototypes()
def connect(self, nemu_folder: str, instance_id: int) -> int:
"""
建立连接
API 原型
`int nemu_connect(const wchar_t* path, int index)`
:param nemu_folder: 模拟器安装路径
:param instance_id: 模拟器实例 ID
:return: 成功返回连接 ID失败返回 0
"""
return self.lib.nemu_connect(nemu_folder, instance_id)
def disconnect(self, connect_id: int) -> None:
"""
断开连接
API 原型
`void nemu_disconnect(int handle)`
:param connect_id: 连接 ID
:return: 无返回值
"""
return self.lib.nemu_disconnect(connect_id)
def get_display_id(self, connect_id: int, pkg: str, app_index: int) -> int:
"""
获取指定包的 display id
API 原型
`int nemu_get_display_id(int handle, const char* pkg, int appIndex)`
:param connect_id: 连接 ID
:param pkg: 包名
:param app_index: 多开应用索引
:return: <0 表示失败>=0 表示有效 display id
"""
return self.lib.nemu_get_display_id(connect_id, pkg.encode('utf-8'), app_index)
def capture_display(
self,
connect_id: int,
display_id: int,
buf_len: int,
width_ptr: ctypes.c_void_p,
height_ptr: ctypes.c_void_p,
buffer_ptr: ctypes.c_void_p,
) -> int:
"""
截取指定显示屏内容
API 原型
`int nemu_capture_display(int handle, unsigned int displayid, int buffer_size, int *width, int *height, unsigned char* pixels)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param buf_len: 缓冲区长度字节
:param width_ptr: 用于接收宽度的指针ctypes.c_void_p/int 指针
:param height_ptr: 用于接收高度的指针ctypes.c_void_p/int 指针
:param buffer_ptr: 用于接收像素数据的指针ctypes.c_void_p/unsigned char* 指针
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_capture_display(
connect_id,
display_id,
buf_len,
width_ptr,
height_ptr,
buffer_ptr,
)
def input_text(self, connect_id: int, text: str) -> int:
"""
输入文本
API 原型
`int nemu_input_text(int handle, int size, const char* buf)`
:param connect_id: 连接 ID
:param text: 输入文本utf-8
:return: 0 表示成功>0 表示失败
"""
buf = text.encode('utf-8')
return self.lib.nemu_input_text(connect_id, len(buf), buf)
def input_touch_down(self, connect_id: int, display_id: int, x: int, y: int) -> int:
"""
发送触摸按下事件
API 原型
`int nemu_input_event_touch_down(int handle, int displayid, int x_point, int y_point)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param x: 触摸点 X 坐标
:param y: 触摸点 Y 坐标
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_touch_down(connect_id, display_id, x, y)
def input_touch_up(self, connect_id: int, display_id: int) -> int:
"""
发送触摸抬起事件
API 原型
`int nemu_input_event_touch_up(int handle, int displayid)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_touch_up(connect_id, display_id)
def input_key_down(self, connect_id: int, display_id: int, key_code: int) -> int:
"""
发送按键按下事件
API 原型
`int nemu_input_event_key_down(int handle, int displayid, int key_code)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param key_code: 按键码
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_key_down(connect_id, display_id, key_code)
def input_key_up(self, connect_id: int, display_id: int, key_code: int) -> int:
"""
发送按键抬起事件
API 原型
`int nemu_input_event_key_up(int handle, int displayid, int key_code)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param key_code: 按键码
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_key_up(connect_id, display_id, key_code)
def input_finger_touch_down(self, connect_id: int, display_id: int, finger_id: int, x: int, y: int) -> int:
"""
多指触摸按下
API 原型
`int nemu_input_event_finger_touch_down(int handle, int displayid, int finger_id, int x_point, int y_point)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param finger_id: 手指编号1-10
:param x: 触摸点 X 坐标
:param y: 触摸点 Y 坐标
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_finger_touch_down(connect_id, display_id, finger_id, x, y)
def input_finger_touch_up(self, connect_id: int, display_id: int, finger_id: int) -> int:
"""
多指触摸抬起
API 原型
`int nemu_input_event_finger_touch_up(int handle, int displayid, int slot_id)`
:param connect_id: 连接 ID
:param display_id: 显示屏 ID
:param finger_id: 手指编号1-10
:return: 0 表示成功>0 表示失败
"""
return self.lib.nemu_input_event_finger_touch_up(connect_id, display_id, finger_id)
# ------------------------------------------------------------------
# 内部工具
# ------------------------------------------------------------------
def __load_dll(self, mumu_root_folder: str) -> ctypes.CDLL:
"""尝试多条路径加载 DLL。传入为 MuMu 根目录。"""
candidate_paths = [
os.path.join(mumu_root_folder, "shell", "sdk", "external_renderer_ipc.dll"),
os.path.join(
mumu_root_folder,
"shell",
"nx_device",
"12.0",
"sdk",
"external_renderer_ipc.dll",
),
]
for p in candidate_paths:
if not os.path.exists(p):
continue
try:
return ctypes.CDLL(p)
except OSError as e: # pragma: no cover
logger.warning("Failed to load DLL (%s): %s", p, e)
raise NemuIpcIncompatible("external_renderer_ipc.dll not found or failed to load.")
def __declare_prototypes(self) -> None:
"""声明 DLL 函数原型,确保 ctypes 类型安全。"""
# 连接 / 断开
self.lib.nemu_connect.argtypes = [ctypes.c_wchar_p, ctypes.c_int]
self.lib.nemu_connect.restype = ctypes.c_int
self.lib.nemu_disconnect.argtypes = [ctypes.c_int]
self.lib.nemu_disconnect.restype = None
# 获取 display id
self.lib.nemu_get_display_id.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_int]
self.lib.nemu_get_display_id.restype = ctypes.c_int
# 截图
self.lib.nemu_capture_display.argtypes = [
ctypes.c_int,
ctypes.c_uint,
ctypes.c_int,
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p,
]
self.lib.nemu_capture_display.restype = ctypes.c_int
# 输入文本
self.lib.nemu_input_text.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_char_p]
self.lib.nemu_input_text.restype = ctypes.c_int
# 触摸
self.lib.nemu_input_event_touch_down.argtypes = [
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
]
self.lib.nemu_input_event_touch_down.restype = ctypes.c_int
self.lib.nemu_input_event_touch_up.argtypes = [ctypes.c_int, ctypes.c_int]
self.lib.nemu_input_event_touch_up.restype = ctypes.c_int
# 按键
self.lib.nemu_input_event_key_down.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int]
self.lib.nemu_input_event_key_down.restype = ctypes.c_int
self.lib.nemu_input_event_key_up.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int]
self.lib.nemu_input_event_key_up.restype = ctypes.c_int
# 多指触摸
self.lib.nemu_input_event_finger_touch_down.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.c_int]
self.lib.nemu_input_event_finger_touch_down.restype = ctypes.c_int
self.lib.nemu_input_event_finger_touch_up.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int]
self.lib.nemu_input_event_finger_touch_up.restype = ctypes.c_int
logger.debug("DLL function prototypes declared")

View File

@ -0,0 +1,327 @@
import os
import ctypes
import logging
import time
from dataclasses import dataclass
from time import sleep
from typing import Literal
from typing_extensions import override
import cv2
import numpy as np
from cv2.typing import MatLike
from ...device import AndroidDevice, Device
from ...protocol import Touchable, Screenshotable
from ...registration import ImplConfig
from .external_renderer_ipc import ExternalRendererIpc
from kotonebot.errors import KotonebotError
logger = logging.getLogger(__name__)
class NemuIpcIncompatible(Exception):
"""MuMu12 版本过低或 dll 不兼容"""
pass
class NemuIpcError(KotonebotError):
"""调用 IPC 过程中发生错误"""
pass
@dataclass
class NemuIpcImplConfig(ImplConfig):
"""nemu_ipc 能力的配置模型。"""
nemu_folder: str
r"""MuMu12 根目录(如 F:\Apps\Netease\MuMuPlayer-12.0)。"""
instance_id: int
"""模拟器实例 ID。"""
display_id: int | None = 0
"""目标显示器 ID默认为 0主显示器。若为 None 且设置了 target_package_name则自动获取对应的 display_id。"""
target_package_name: str | None = None
"""目标应用包名,用于自动获取 display_id。"""
app_index: int = 0
"""多开应用索引,传给 get_display_id 方法。"""
wait_package_timeout: float = 60 # 单位秒,-1 表示永远等待0 表示不等待,立即抛出异常
wait_package_interval: float = 0.1 # 单位秒
class NemuIpcImpl(Touchable, Screenshotable):
"""
利用 MuMu12 提供的 external_renderer_ipc.dll 进行截图与触摸控制
"""
def __init__(self, config: NemuIpcImplConfig):
self.config = config
self.__width: int = 0
self.__height: int = 0
self.__connected: bool = False
self._connect_id: int = 0
self.nemu_folder = config.nemu_folder
# --------------------------- DLL 封装 ---------------------------
self._ipc = ExternalRendererIpc(config.nemu_folder)
logger.info("ExternalRendererIpc initialized and DLL loaded")
@property
def width(self) -> int:
"""
屏幕宽度
若为 0表示未连接或未获取到分辨率
"""
return self.__width
@property
def height(self) -> int:
"""
屏幕高度
若为 0表示未连接或未获取到分辨率
"""
return self.__height
@property
def connected(self) -> bool:
"""是否已连接。"""
return self.__connected
# ------------------------------------------------------------------
# 基础控制
# ------------------------------------------------------------------
def _ensure_connected(self) -> None:
if not self.__connected:
self.connect()
def _get_display_id(self) -> int:
"""获取有效的 display_id。"""
# 如果配置中直接指定了 display_id直接返回
if self.config.display_id is not None:
return self.config.display_id
# 如果设置了 target_package_name实时获取 display_id
if self.config.target_package_name:
self._ensure_connected()
timeout = self.config.wait_package_timeout
interval = self.config.wait_package_interval
if timeout == -1:
timeout = float('inf')
start_time = time.time()
while True:
display_id = self._ipc.get_display_id(
self._connect_id,
self.config.target_package_name,
self.config.app_index
)
if display_id >= 0:
return display_id
elif display_id == -1:
# 可以继续等
pass
else:
# 未知错误
raise NemuIpcError(f"Failed to get display_id for package '{self.config.target_package_name}', error code={display_id}")
if time.time() - start_time > timeout:
break
sleep(interval)
raise NemuIpcError(f"Failed to get display_id for package '{self.config.target_package_name}' within {timeout}s")
# 如果都没有设置,抛出错误
raise NemuIpcError("display_id is None and target_package_name is not set. Please set display_id or target_package_name in config.")
def connect(self) -> None:
"""连接模拟器。"""
if self.__connected:
return
connect_id = self._ipc.connect(self.nemu_folder, self.config.instance_id)
if connect_id == 0:
raise NemuIpcError("nemu_connect failed, please check if the emulator is running and the instance ID is correct.")
self._connect_id = connect_id
self.__connected = True
logger.debug("NemuIpc connected, connect_id=%d", connect_id)
def disconnect(self) -> None:
"""断开连接。"""
if not self.__connected:
return
self._ipc.disconnect(self._connect_id)
self.__connected = False
self._connect_id = 0
logger.debug("NemuIpc disconnected.")
# ------------------------------------------------------------------
# Screenshotable 接口实现
# ------------------------------------------------------------------
@property
def screen_size(self) -> tuple[int, int]:
"""获取屏幕分辨率。"""
if self.__width == 0 or self.__height == 0:
self._refresh_resolution()
if self.__width == 0 or self.__height == 0:
raise NemuIpcError("Screen resolution not obtained, please connect to the emulator first.")
return self.__width, self.__height
@override
def detect_orientation(self):
return self.get_display_orientation(self._get_display_id())
def get_display_orientation(self, display_id: int = 0) -> Literal['portrait', 'landscape'] | None:
"""获取指定显示屏的方向。"""
width, height = self.query_resolution(display_id)
if width > height:
return "landscape"
if height > width:
return "portrait"
return None
@override
def screenshot(self) -> MatLike:
self._ensure_connected()
# 必须每次都更新分辨率,因为屏幕可能会旋转
self._refresh_resolution()
length = self.__width * self.__height * 4 # RGBA
buf_type = ctypes.c_ubyte * length
buffer = buf_type()
w_ptr = ctypes.pointer(ctypes.c_int(self.__width))
h_ptr = ctypes.pointer(ctypes.c_int(self.__height))
ret = self._ipc.capture_display(
self._connect_id,
self._get_display_id(),
length,
ctypes.cast(w_ptr, ctypes.c_void_p),
ctypes.cast(h_ptr, ctypes.c_void_p),
ctypes.cast(buffer, ctypes.c_void_p),
)
if ret != 0:
raise NemuIpcError(f"nemu_capture_display screenshot failed, error code={ret}")
# 读入并转换数据
img = np.ctypeslib.as_array(buffer).reshape((self.__height, self.__width, 4))
# RGBA -> BGR
img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR)
cv2.flip(img, 0, dst=img)
return img
# --------------------------- 内部工具 -----------------------------
def _refresh_resolution(self) -> None:
"""刷新分辨率信息。"""
display_id = self._get_display_id()
self.__width, self.__height = self.query_resolution(display_id)
def query_resolution(self, display_id: int = 0) -> tuple[int, int]:
"""
查询指定显示屏的分辨率
:param display_id: 显示屏 ID
:return: 分辨率 (width, height)
:raise NemuIpcError: 查询失败
"""
self._ensure_connected()
w_ptr = ctypes.pointer(ctypes.c_int(0))
h_ptr = ctypes.pointer(ctypes.c_int(0))
ret = self._ipc.capture_display(
self._connect_id,
display_id,
0,
ctypes.cast(w_ptr, ctypes.c_void_p),
ctypes.cast(h_ptr, ctypes.c_void_p),
ctypes.c_void_p(),
)
if ret != 0:
raise NemuIpcError(f"Call nemu_capture_display failed. Return value={ret}")
return w_ptr.contents.value, h_ptr.contents.value
# ------------------------------------------------------------------
# Touchable 接口实现
# ------------------------------------------------------------------
def __convert_pos(self, x: int, y: int) -> tuple[int, int]:
# Android 显示屏有两套坐标:逻辑坐标与物理坐标。
# 逻辑坐标原点始终是画面左上角,而物理坐标原点则始终是显示屏的左上角。
# 如果屏幕画面旋转,会导致两个坐标的原点不同,坐标也不同。
# ========
# 这里传给 MuMu 的是逻辑坐标ExternalRendererIpc DLL 内部会
# 自动判断旋转,并转换为物理坐标。但是这部分有个 bug
# 旋转没有考虑到多显示器,只是以主显示器为准,若两个显示器旋转不一致,
# 会导致错误地转换坐标。因此需要在 Python 层面 workaround 这个问题。
# 通过判断主显示器与当前显示器的旋转,将坐标进行预转换,抵消 DLL 层的错误转换。
display_id = self._get_display_id()
if display_id == 0:
return x, y
else:
primary = self.get_display_orientation(0)
primary_size = self.query_resolution(0)
current = self.get_display_orientation(display_id)
if primary == current:
return x, y
else:
# 如果旋转不一致,视为顺时针旋转了 90°
# 因此我们要提前逆时针旋转 90°
self._refresh_resolution()
x, y = y, primary_size[1] - x
return x, y
@override
def click(self, x: int, y: int) -> None:
self._ensure_connected()
display_id = self._get_display_id()
x, y = self.__convert_pos(x, y)
self._ipc.input_touch_down(self._connect_id, display_id, x, y)
sleep(0.01)
self._ipc.input_touch_up(self._connect_id, display_id)
@override
def swipe(
self,
x1: int,
y1: int,
x2: int,
y2: int,
duration: float | None = None,
) -> None:
self._ensure_connected()
duration = duration or 0.3
steps = max(int(duration / 0.01), 2)
display_id = self._get_display_id()
x1, y1 = self.__convert_pos(x1, y1)
x2, y2 = self.__convert_pos(x2, y2)
xs = np.linspace(x1, x2, steps, dtype=int)
ys = np.linspace(y1, y2, steps, dtype=int)
# 按下第一点
self._ipc.input_touch_down(self._connect_id, display_id, xs[0], ys[0])
sleep(0.01)
# 中间移动
for px, py in zip(xs[1:-1], ys[1:-1]):
self._ipc.input_touch_down(self._connect_id, display_id, px, py)
sleep(0.01)
# 最终抬起
self._ipc.input_touch_up(self._connect_id, display_id)
sleep(0.01)
if __name__ == '__main__':
nemu = NemuIpcImpl(NemuIpcImplConfig(
r'F:\Apps\Netease\MuMuPlayer-12.0', 0, None,
target_package_name='com.android.chrome',
))
nemu.connect()
# while True:
# nemu.click(0, 0)
nemu.click(100, 100)
nemu.click(100*3, 100)
nemu.click(100*3, 100*3)

View File

@ -23,8 +23,8 @@ from cv2.typing import MatLike
from kotonebot import logging
from ..device import Device, WindowsDevice
from ..protocol import Touchable, Screenshotable
from ..registration import register_impl, ImplConfig
from .windows import WindowsImpl, WindowsImplConfig, create_windows_device
from ..registration import ImplConfig
from .windows import WindowsImpl, WindowsImplConfig
logger = logging.getLogger(__name__)
@ -63,7 +63,11 @@ class RemoteWindowsServer:
self.port = port
self.server = None
self.device = WindowsDevice()
self.impl = create_windows_device(windows_impl_config)
self.impl = WindowsImpl(
WindowsDevice(),
ahk_exe_path=windows_impl_config.ahk_exe_path,
window_title=windows_impl_config.window_title
)
self.device._screenshot = self.impl
self.device._touch = self.impl
@ -186,14 +190,4 @@ class RemoteWindowsImpl(Touchable, Screenshotable):
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float | None = None) -> None:
"""Swipe from (x1, y1) to (x2, y2) on the remote server."""
if not self.proxy.swipe(x1, y1, x2, y2, duration):
raise RuntimeError(f"Failed to swipe from ({x1}, {y1}) to ({x2}, {y2})")
# 编写并注册创建函数
@register_impl('remote_windows', config_model=RemoteWindowsImplConfig)
def create_remote_windows_device(config: RemoteWindowsImplConfig) -> Device:
device = WindowsDevice()
remote_impl = RemoteWindowsImpl(device, config.host, config.port)
device._touch = remote_impl
device._screenshot = remote_impl
return device
raise RuntimeError(f"Failed to swipe from ({x1}, {y1}) to ({x2}, {y2})")

View File

@ -4,21 +4,19 @@ from typing import Literal
import numpy as np
import uiautomator2 as u2
from cv2.typing import MatLike
from adbutils._device import AdbDevice as AdbUtilsDevice
from kotonebot import logging
from ..device import Device
from ..protocol import Screenshotable, Commandable, Touchable
from ..registration import register_impl
from .adb import AdbImplConfig, _create_adb_device_base
logger = logging.getLogger(__name__)
SCREENSHOT_INTERVAL = 0.2
class UiAutomator2Impl(Screenshotable, Commandable, Touchable):
def __init__(self, device: Device):
self.device = device
self.u2_client = u2.Device(device.adb.serial)
def __init__(self, adb_connection: AdbUtilsDevice):
self.u2_client = u2.Device(adb_connection.serial)
self.__last_screenshot_time = 0
def screenshot(self) -> MatLike:
@ -40,10 +38,7 @@ class UiAutomator2Impl(Screenshotable, Commandable, Touchable):
def screen_size(self) -> tuple[int, int]:
info = self.u2_client.info
sizes = info['displayWidth'], info['displayHeight']
if self.device.orientation == 'landscape':
return (max(sizes), min(sizes))
else:
return (min(sizes), max(sizes))
return sizes
def detect_orientation(self) -> Literal['portrait', 'landscape'] | None:
"""
@ -84,10 +79,4 @@ class UiAutomator2Impl(Screenshotable, Commandable, Touchable):
"""
滑动屏幕
"""
self.u2_client.swipe(x1, y1, x2, y2, duration=duration or 0.1)
@register_impl('uiautomator2', config_model=AdbImplConfig)
def create_uiautomator2_device(config: AdbImplConfig) -> Device:
"""UiAutomator2Impl 工厂函数"""
return _create_adb_device_base(config, UiAutomator2Impl)
self.u2_client.swipe(x1, y1, x2, y2, duration=duration or 0.1)

View File

@ -13,7 +13,7 @@ from cv2.typing import MatLike
from ..device import Device, WindowsDevice
from ..protocol import Commandable, Touchable, Screenshotable
from ..registration import register_impl, ImplConfig
from ..registration import ImplConfig
# 1. 定义配置模型
@dataclass
@ -51,15 +51,6 @@ class WindowsImpl(Touchable, Screenshotable):
# 将点击坐标设置为相对 Client
self.ahk.set_coord_mode('Mouse', 'Client')
@cached_property
def scale_ratio(self) -> float:
"""
缩放比例截图与模拟输入前都会根据这个比例缩放
"""
left, _, right, _ = self.__client_rect()
w = right - left
return 720 / w
@property
def hwnd(self) -> int:
if self.__hwnd is None:
@ -124,18 +115,14 @@ class WindowsImpl(Touchable, Screenshotable):
# 将 RGBA 转换为 RGB
cropped_im = cv2.cvtColor(cropped_im, cv2.COLOR_RGBA2RGB)
# 缩放
cropped_im = cv2.resize(cropped_im, None, fx=self.scale_ratio, fy=self.scale_ratio)
return cropped_im
@property
def screen_size(self) -> tuple[int, int]:
# 因为截图和点击的坐标都被缩放了,
# 因此这里只要返回固定值即可
if self.device.orientation == 'landscape':
return 1280, 720
else:
return 720, 1280
left, top, right, bot = self.__client_rect()
w = right - left
h = bot - top
return w, h
def detect_orientation(self) -> None | Literal['portrait'] | Literal['landscape']:
pos = self.ahk.win_get_position(self.window_title)
@ -154,7 +141,6 @@ class WindowsImpl(Touchable, Screenshotable):
x = 2
if y == 0:
y = 2
x, y = int(x / self.scale_ratio), int(y / self.scale_ratio)
if not self.ahk.win_is_active(self.window_title):
self.ahk.win_activate(self.window_title)
self.ahk.click(x, y)
@ -162,26 +148,9 @@ class WindowsImpl(Touchable, Screenshotable):
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float | None = None) -> None:
if not self.ahk.win_is_active(self.window_title):
self.ahk.win_activate(self.window_title)
x1, y1 = int(x1 / self.scale_ratio), int(y1 / self.scale_ratio)
x2, y2 = int(x2 / self.scale_ratio), int(y2 / self.scale_ratio)
# TODO: 这个 speed 的单位是什么?
self.ahk.mouse_drag(x2, y2, from_position=(x1, y1), coord_mode='Client', speed=10)
# 3. 编写并注册创建函数
@register_impl('windows', config_model=WindowsImplConfig)
def create_windows_device(config: WindowsImplConfig) -> Device:
device = WindowsDevice()
impl = WindowsImpl(
device,
window_title=config.window_title,
ahk_exe_path=config.ahk_exe_path
)
device._touch = impl
device._screenshot = impl
return device
if __name__ == '__main__':
from ..device import Device
device = Device()

View File

@ -7,9 +7,10 @@ if TYPE_CHECKING:
from .implements.adb import AdbImplConfig
from .implements.remote_windows import RemoteWindowsImplConfig
from .implements.windows import WindowsImplConfig
from .implements.nemu_ipc import NemuIpcImplConfig
AdbBasedImpl = Literal['adb', 'adb_raw', 'uiautomator2']
DeviceImpl = str | AdbBasedImpl | Literal['windows', 'remote_windows']
DeviceImpl = str | AdbBasedImpl | Literal['windows', 'remote_windows', 'nemu_ipc']
# --- 核心类型定义 ---
@ -21,69 +22,3 @@ class ImplRegistrationError(KotonebotError):
class ImplConfig:
"""所有设备实现配置模型的名义上的基类,便于类型约束。"""
pass
T_Config = TypeVar("T_Config", bound=ImplConfig)
# 定义两种创建者函数类型
CreatorWithConfig = Callable[[Any], Device]
CreatorWithoutConfig = Callable[[], Device]
# --- 底层 API: 公开的注册表 ---
# 注册表结构: {'impl_name': (创建函数, 配置模型类 或 None)}
DEVICE_CREATORS: Dict[str, tuple[Callable[..., Device], Type[ImplConfig] | None]] = {}
def register_impl(name: str, config_model: Type[ImplConfig] | None = None) -> Callable[..., Any]:
"""
一个统一的装饰器用于向 DEVICE_CREATORS 注册表中注册一个设备实现
:param name: 实现的名称 (e.g., 'windows', 'adb')
:param config_model: (可选) 与该实现关联的 dataclass 配置模型
"""
def decorator(creator_func: Callable[..., Device]) -> Callable[..., Device]:
if name in DEVICE_CREATORS:
raise ImplRegistrationError(f"实现 '{name}' 已被注册。")
DEVICE_CREATORS[name] = (creator_func, config_model)
return creator_func
return decorator
# --- 高层 API: 带 overload 的便利函数 ---
# 为需要配置的已知 impl 提供 overload
@overload
def create_device(impl_name: Literal['windows'], config: 'WindowsImplConfig') -> Device: ...
@overload
def create_device(impl_name: Literal['remote_windows'], config: 'RemoteWindowsImplConfig') -> Device: ...
@overload
def create_device(impl_name: AdbBasedImpl, config: 'AdbImplConfig') -> Device: ...
# 函数的实际实现
def create_device(impl_name: DeviceImpl, config: ImplConfig | None = None) -> Device:
"""
根据名称和可选的配置对象统一创建设备
"""
creator_tuple = DEVICE_CREATORS.get(impl_name)
if not creator_tuple:
raise ImplRegistrationError(f"未找到名为 '{impl_name}' 的实现。")
creator_func, registered_config_model = creator_tuple
# 情况 A: 实现需要配置
if registered_config_model is not None:
creator_with_config = cast(CreatorWithConfig, creator_func)
if config is None:
raise ValueError(f"实现 '{impl_name}' 需要一个配置对象,但传入的是 None。")
if not isinstance(config, registered_config_model):
raise TypeError(f"'{impl_name}' 传入的配置类型错误,应为 '{registered_config_model.__name__}',实际为 '{type(config).__name__}'")
return creator_with_config(config)
# 情况 B: 实现无需配置
else:
creator_without_config = cast(CreatorWithoutConfig, creator_func)
if config is not None:
print(f"提示:实现 '{impl_name}' 无需配置,但你提供了一个配置对象,它将被忽略。")
return creator_without_config()

View File

@ -3,10 +3,10 @@ from typing import Generic, TypeVar, Literal
from pydantic import BaseModel, ConfigDict
from kotonebot.client import DeviceImpl
T = TypeVar('T')
BackendType = Literal['custom', 'mumu12', 'leidian', 'dmm']
DeviceRecipes = Literal['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows', 'nemu_ipc']
class ConfigBaseModel(BaseModel):
model_config = ConfigDict(use_attribute_docstrings=True)
@ -27,7 +27,7 @@ class BackendConfig(ConfigBaseModel):
雷电模拟器需要设置正确的模拟器名否则 自动启动模拟器 功能将无法正常工作
其他功能不受影响
"""
screenshot_impl: DeviceImpl = 'adb'
screenshot_impl: DeviceRecipes = 'adb'
"""
截图方法暂时推荐使用adb截图方式
@ -48,6 +48,8 @@ class BackendConfig(ConfigBaseModel):
"""Windows 截图方式的窗口标题"""
windows_ahk_path: str | None = None
"""Windows 截图方式的 AutoHotkey 可执行文件路径,为 None 时使用默认路径"""
mumu_background_mode: bool = False
"""MuMu12 模拟器后台保活模式"""
class PushConfig(ConfigBaseModel):
"""推送配置。"""

View File

@ -1,8 +1,10 @@
import sys
sys.path.append('./projects')
import runpy
import logging
import argparse
from kotonebot.kaa.common import BaseConfig
from kaa.common import BaseConfig
def run_script(script_path: str) -> None:
@ -12,14 +14,14 @@ def run_script(script_path: str) -> None:
Args:
script_path: Python 脚本的路径
"""
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')
# 获取模块名
module_name = script_path.strip('.py').replace('\\', '/').strip('/').replace('/', '.')
module_name = script_path.strip('.py').lstrip('projects/').replace('\\', '/').strip('/').replace('/', '.')
print(f"正在运行脚本: {script_path}")
# 运行脚本
from kotonebot.backend.context import init_context, manual_context
from kotonebot.kaa.main.kaa import Kaa
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')
from kaa.main.kaa import Kaa
logging.getLogger('kotonebot').setLevel(logging.DEBUG)
config_path = './config.json'
kaa_instance = Kaa(config_path)

View File

@ -24,3 +24,10 @@ class TaskNotFoundError(KotonebotError):
def __init__(self, task_id: str):
self.task_id = task_id
super().__init__(f'Task "{task_id}" not found.')
class UnscalableResolutionError(KotonebotError):
def __init__(self, target_resolution: tuple[int, int], screen_size: tuple[int, int]):
self.target_resolution = target_resolution
self.screen_size = screen_size
super().__init__(f'Cannot scale to target resolution {target_resolution}. '
f'Screen size: {screen_size}')

View File

@ -1,18 +1,15 @@
from importlib import resources
from typing_extensions import override
from kotonebot.client import Device, DeviceImpl
from kotonebot.client.registration import create_device
from kotonebot.client.implements.windows import WindowsImplConfig
from kotonebot.client.implements.remote_windows import RemoteWindowsImplConfig
from kotonebot.client import Device
from kotonebot.client.host import HostProtocol, Instance
from kotonebot.client.host.protocol import WindowsHostConfig, RemoteWindowsHostConfig
from kotonebot.client.host.windows_common import WindowsRecipes, WindowsHostConfigs, CommonWindowsCreateDeviceMixin
DmmHostConfigs = WindowsHostConfig | RemoteWindowsHostConfig
DmmHostConfigs = WindowsHostConfigs
DmmRecipes = WindowsRecipes
# TODO: 可能应该把 start_game 和 end_game 里对启停的操作移动到这里来
class DmmInstance(Instance[DmmHostConfigs]):
class DmmInstance(CommonWindowsCreateDeviceMixin, Instance[DmmHostConfigs]):
def __init__(self):
super().__init__('dmm', 'gakumas')
@ -37,31 +34,11 @@ class DmmInstance(Instance[DmmHostConfigs]):
raise NotImplementedError()
@override
def create_device(self, impl: DeviceImpl, host_config: DmmHostConfigs) -> Device:
if impl == 'windows':
assert isinstance(host_config, WindowsHostConfig)
win_config = WindowsImplConfig(
window_title=host_config.window_title,
ahk_exe_path=host_config.ahk_exe_path
)
return create_device(impl, win_config)
def create_device(self, impl: DmmRecipes, host_config: DmmHostConfigs) -> Device:
"""为 DMM 实例创建 Device。"""
return super().create_device(impl, host_config)
elif impl == 'remote_windows':
assert isinstance(host_config, RemoteWindowsHostConfig)
config = RemoteWindowsImplConfig(
windows_impl_config=WindowsImplConfig(
window_title=host_config.windows_host_config.window_title,
ahk_exe_path=host_config.windows_host_config.ahk_exe_path
),
host=host_config.host,
port=host_config.port
)
return create_device(impl, config)
else:
raise ValueError(f'Unsupported impl for DMM: {impl}')
class DmmHost(HostProtocol):
class DmmHost(HostProtocol[DmmRecipes]):
instance = DmmInstance()
"""DmmInstance 单例。"""
@ -77,3 +54,7 @@ class DmmHost(HostProtocol):
@staticmethod
def query(*, id: str) -> Instance | None:
raise NotImplementedError()
@staticmethod
def recipes() -> 'list[DmmRecipes]':
return ['windows', 'remote_windows']

View File

@ -2,6 +2,7 @@ import os
import traceback
import zipfile
import logging
import copy
from functools import partial
from itertools import chain
from datetime import datetime, timedelta
@ -33,6 +34,7 @@ ConfigKey = Literal[
'check_emulator', 'emulator_path',
'adb_emulator_name', 'emulator_args',
'_mumu_index', '_leidian_index',
'mumu_background_mode',
# purchase
'purchase_enabled',
@ -94,17 +96,30 @@ ConfigSetFunction = Callable[[BaseConfig, Dict[ConfigKey, Any]], None]
ConfigBuilderReturnValue = Tuple[ConfigSetFunction, Dict[ConfigKey, GradioInput]]
def _save_bug_report(
title: str,
description: str,
version: str,
upload: bool,
path: str | None = None
) -> Generator[str, None, str]:
"""
保存报告
:param path: 保存的路径若为 `None`则保存到 `./reports/bug-{YY-MM-DD HH-MM-SS}.zip`
:param title: 标题
:param description: 描述
:param version: 版本号
:param upload: 是否上传
:param path: 保存的路径若为 `None`则保存到 `./reports/bug-YY-MM-DD HH-MM-SS_标题.zip`
:return: 保存的路径
"""
from kotonebot import device
from kotonebot.backend.context import ContextStackVars
import re
# 过滤标题中的非法文件名字符
def sanitize_filename(s: str) -> str:
# 替换 \/:*?"<>| 为空或下划线
return re.sub(r'[\\/:*?"<>|]', '_', s)
# 确保目录存在
os.makedirs('logs', exist_ok=True)
@ -112,8 +127,18 @@ def _save_bug_report(
error = ""
if path is None:
path = f'./reports/bug-{datetime.now().strftime("%y-%m-%d %H-%M-%S")}.zip'
safe_title = sanitize_filename(title)[:30] or "无标题"
timestamp = datetime.now().strftime("%y-%m-%d-%H-%M-%S")
path = f'./reports/bug_{timestamp}_{safe_title}.zip'
with zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED, compresslevel=9) as zipf:
# 打包描述文件
yield "### 打包描述文件..."
try:
description_content = f"标题:{title}\n类型bug\n内容:\n{description}"
zipf.writestr('description.txt', description_content.encode('utf-8'))
except Exception as e:
error += f"保存描述文件失败:{str(e)}\n"
# 打包截图
yield "### 打包上次截图..."
try:
@ -158,12 +183,16 @@ def _save_bug_report(
# 写出版本号
zipf.writestr('version.txt', version)
if not upload:
yield f"### 报告已保存至 {os.path.abspath(path)}"
return path
# 上传报告
from kotonebot.ui.file_host.sensio import upload
from kotonebot.ui.file_host.sensio import upload as upload_file
yield "### 上传报告..."
url = ''
try:
url = upload(path)
url = upload_file(path)
except Exception as e:
yield f"### 上传报告失败:{str(e)}\n\n"
return ''
@ -371,9 +400,41 @@ class KotoneBotUI:
assert key in CONFIG_KEY_VALUE, f"未知的配置项:{key}"
key = cast(ConfigKey, key)
data[key] = value
# 设置结果
# 先设置options
for (set_func, _) in return_values:
set_func(options, data)
# 验证规则1截图方法验证
screenshot_method = self.current_config.backend.screenshot_impl
backend_type = self.current_config.backend.type
valid_screenshot_methods = {
'mumu12': ['adb', 'adb_raw', 'uiautomator2', 'nemu_ipc'],
'leidian': ['adb', 'adb_raw', 'uiautomator2'],
'custom': ['adb', 'adb_raw', 'uiautomator2'],
'dmm': ['remote_windows', 'windows']
}
if screenshot_method not in valid_screenshot_methods.get(backend_type, []):
gr.Warning(f"截图方法 '{screenshot_method}' 不适用于当前选择的模拟器类型,配置未保存。")
return ""
# 验证规则2若启用培育那么培育偶像不能为空
if options.produce.enabled and not options.produce.idols:
gr.Warning("启用培育时,培育偶像不能为空,配置未保存。")
return ""
# 验证规则3若启用AP/金币购买,对应的商品不能为空
if options.purchase.ap_enabled and not options.purchase.ap_items:
gr.Warning("启用AP购买时AP商店购买物品不能为空配置未保存。")
return ""
if options.purchase.money_enabled and not options.purchase.money_items:
gr.Warning("启用金币购买时,金币商店购买物品不能为空,配置未保存。")
return ""
# 验证通过,保存配置
self.current_config.options = options
try:
save_config(self.config, "config.json")
@ -393,7 +454,14 @@ class KotoneBotUI:
if self._kaa.upgrade_msg:
gr.Markdown('### 配置升级报告')
gr.Markdown(self._kaa.upgrade_msg)
gr.Markdown('脚本报错或者卡住?点击"日志"选项卡中的"一键导出报告"可以快速反馈!')
gr.Markdown('脚本报错或者卡住?前往"反馈"选项卡可以快速导出报告!')
# 添加调试模式警告
if self.current_config.keep_screenshots:
gr.Markdown(
'<div style="color: red; font-size: larger;">当前启用了调试功能「保留截图数据」,调试结束后正常使用时建议关闭此选项!</div>',
elem_classes=["debug-warning"]
)
task_status = gr.Dataframe(
headers=["任务", "状态"],
@ -555,7 +623,9 @@ class KotoneBotUI:
# choices = ['windows', 'remote_windows']
# else: # Mumu, Leidian, Custom
# choices = ['adb', 'adb_raw', 'uiautomator2']
choices = ['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows']
# else:
# raise ValueError(f'Unsupported backend type: {type_in_config}')
choices = ['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows', 'nemu_ipc']
if impl_value not in choices:
new_value = choices[0]
else:
@ -576,13 +646,21 @@ class KotoneBotUI:
choices=[(i.name, i.id) for i in instances],
interactive=True
)
mumu_background_mode = gr.Checkbox(
label="MuMu12 模拟器后台保活模式",
value=self.current_config.backend.mumu_background_mode,
info=BackendConfig.model_fields['mumu_background_mode'].description,
interactive=True
)
except: # noqa: E722
logger.exception('Failed to list installed MuMu12')
gr.Markdown('获取 MuMu12 模拟器列表失败,请升级模拟器到最新版本。若问题依旧,前往 QQ 群、QQ 频道或 Github 反馈 bug。')
mumu_instance = gr.Dropdown(visible=False)
mumu_background_mode = gr.Checkbox(visible=False)
else:
# 为了让 return 收集组件时不报错
mumu_instance = gr.Dropdown(visible=False)
mumu_background_mode = gr.Checkbox(visible=False)
with gr.Tab("雷电", interactive=has_leidian, id="leidian") as tab_leidian:
gr.Markdown("已选中雷电模拟器")
@ -661,7 +739,7 @@ class KotoneBotUI:
# choices = ['adb', 'adb_raw', 'uiautomator2']
# else:
# raise ValueError(f'Unsupported backend type: {type_in_config}')
choices = ['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows']
choices = ['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows', 'nemu_ipc']
screenshot_impl = gr.Dropdown(
choices=choices,
value=self.current_config.backend.screenshot_impl,
@ -711,6 +789,7 @@ class KotoneBotUI:
if current_tab == 0: # Mumu
self.current_config.backend.type = 'mumu12'
self.current_config.backend.instance_id = data['_mumu_index']
self.current_config.backend.mumu_background_mode = data['mumu_background_mode']
elif current_tab == 1: # Leidian
self.current_config.backend.type = 'leidian'
self.current_config.backend.instance_id = data['_leidian_index']
@ -741,7 +820,8 @@ class KotoneBotUI:
'adb_emulator_name': adb_emulator_name,
'emulator_args': emulator_args,
'_mumu_index': mumu_instance,
'_leidian_index': leidian_instance
'_leidian_index': leidian_instance,
'mumu_background_mode': mumu_background_mode
}
def _create_purchase_settings(self) -> ConfigBuilderReturnValue:
@ -1460,27 +1540,33 @@ class KotoneBotUI:
)
def _create_log_tab(self) -> None:
with gr.Tab("日志"):
gr.Markdown("## 日志")
with gr.Tab("反馈"):
gr.Markdown("## 反馈")
gr.Markdown('脚本报错或者卡住?在这里填写信息可以快速反馈!')
with gr.Column():
report_title = gr.Textbox(label="标题", placeholder="用一句话概括问题")
report_type = gr.Dropdown(label="反馈类型", choices=["bug"], value="bug", interactive=False)
report_description = gr.Textbox(label="描述", lines=5, placeholder="详细描述问题。例如:什么时候出错、是否每次都出错、出错时的步骤是什么")
with gr.Row():
export_dumps_btn = gr.Button("导出 dump")
export_logs_btn = gr.Button("导出日志")
with gr.Row():
save_report_btn = gr.Button("一键导出报告")
upload_report_btn = gr.Button("上传")
save_local_report_btn = gr.Button("保存至本地")
result_text = gr.Markdown("等待操作\n\n\n")
export_dumps_btn.click(
fn=self.export_dumps,
def on_upload_click(title: str, description: str):
yield from _save_bug_report(title, description, self._kaa.version, upload=True)
def on_save_local_click(title: str, description: str):
yield from _save_bug_report(title, description, self._kaa.version, upload=False)
upload_report_btn.click(
fn=on_upload_click,
inputs=[report_title, report_description],
outputs=[result_text]
)
export_logs_btn.click(
fn=self.export_logs,
outputs=[result_text]
)
save_report_btn.click(
fn=partial(_save_bug_report, version=self._kaa.version),
save_local_report_btn.click(
fn=on_save_local_click,
inputs=[report_title, report_description],
outputs=[result_text]
)
@ -1493,6 +1579,7 @@ class KotoneBotUI:
def _create_screen_tab(self) -> None:
with gr.Tab("画面"):
gr.Markdown("## 当前设备画面")
refresh_btn = gr.Button("刷新画面", variant="primary")
WIDTH = 720 // 3
HEIGHT = 1280 // 3
last_update_text = gr.Markdown("上次更新时间:无数据")
@ -1501,14 +1588,14 @@ class KotoneBotUI:
def update_screenshot():
ctx = ContextStackVars.current()
if ctx is None:
return [None, last_update_text.value]
return [None, "上次更新时间:无上下文数据"]
screenshot = ctx._screenshot
if screenshot is None:
return [None, last_update_text.value]
return [None, "上次更新时间:无截图数据"]
screenshot = cv2.cvtColor(screenshot, cv2.COLOR_BGR2RGB)
return screenshot, f"上次更新时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
gr.Timer(0.3).tick(
refresh_btn.click(
fn=update_screenshot,
outputs=[screenshot_display, last_update_text]
)

View File

@ -1,5 +1,6 @@
import io
import os
from typing import Any, Literal, cast
import zipfile
import logging
import traceback
@ -9,6 +10,8 @@ from typing_extensions import override
import cv2
from kotonebot.client.host.mumu12_host import MuMu12HostConfig
from ...client import Device
from kotonebot.ui import user
from kotonebot import KotoneBot
@ -112,6 +115,10 @@ class Kaa(KotoneBot):
if self.backend_instance is None:
raise ValueError('Backend instance is not set.')
_set_instance(self.backend_instance)
from kotonebot import device
logger.info('Set target resolution to 720x1280.')
device.orientation = 'portrait'
device.target_resolution = (720, 1280)
def __get_backend_instance(self, config: UserConfig) -> Instance:
"""
@ -228,12 +235,32 @@ class Kaa(KotoneBot):
else:
raise ValueError(f"Impl of '{impl_name}' is not supported on DMM.")
return self.backend_instance.create_device(impl_name, host_conf)
# 统一处理所有基于 ADB 的后端
elif isinstance(self.backend_instance, (CustomInstance, Mumu12Instance, LeidianInstance)):
if impl_name in ['adb', 'adb_raw', 'uiautomator2']:
host_conf = AdbHostConfig(timeout=180)
if impl_name == 'nemu_ipc' and isinstance(self.backend_instance, Mumu12Instance):
impl_name = cast(Literal['nemu_ipc'], impl_name)
options = cast(BaseConfig, user_config.options)
# 根据 mumu_background_mode 决定是否传递后台保活参数
if user_config.backend.mumu_background_mode:
host_conf = MuMu12HostConfig(
display_id=None,
target_package_name=options.start_game.game_package_name,
app_index=0,
timeout=180
)
else:
host_conf = MuMu12HostConfig(
timeout=180
)
return self.backend_instance.create_device(impl_name, host_conf)
elif impl_name in ['adb', 'adb_raw', 'uiautomator2']:
impl_name = cast(Literal['adb', 'adb_raw', 'uiautomator2'], impl_name)
host_conf = AdbHostConfig(timeout=180)
return self.backend_instance.create_device(
cast(Any, impl_name), # :(
host_conf
)
else:
raise ValueError(f"{user_config.backend.type} backend does not support implementation '{impl_name}'")

View File

@ -6,6 +6,8 @@ from PyQt6.QtCore import Qt
import sys
import cv2
import numpy as np
import tempfile
import os
class HSVRangeTool(QMainWindow):
def __init__(self):
@ -32,9 +34,14 @@ class HSVRangeTool(QMainWindow):
control_layout = QVBoxLayout(control_panel)
# 添加载入图片按钮
load_btn = QPushButton('载入图片')
load_btn.clicked.connect(self.on_load_image)
control_layout.addWidget(load_btn)
load_layout = QHBoxLayout()
open_btn = QPushButton('打开图片')
paste_btn = QPushButton('粘贴图片')
open_btn.clicked.connect(self.on_open_image)
paste_btn.clicked.connect(self.on_paste_image)
load_layout.addWidget(open_btn)
load_layout.addWidget(paste_btn)
control_layout.addLayout(load_layout)
# 在控制面板添加取色器按钮
picker_layout = QHBoxLayout()
@ -156,7 +163,7 @@ class HSVRangeTool(QMainWindow):
(screen_geometry.height() - size.height()) // 2
)
def on_load_image(self):
def on_open_image(self):
file_name, _ = QFileDialog.getOpenFileName(
self,
"选择图片",
@ -170,6 +177,52 @@ class HSVRangeTool(QMainWindow):
self.hsv_image = cv2.cvtColor(self.image, cv2.COLOR_BGR2HSV)
self.update_image()
def on_paste_image(self):
# 从剪贴板获取图片
clipboard = QApplication.clipboard()
if clipboard is None:
QMessageBox.warning(self, "错误", "无法访问剪贴板")
return
mime_data = clipboard.mimeData()
if mime_data is None:
QMessageBox.warning(self, "错误", "无法获取剪贴板数据")
return
if mime_data.hasImage():
# 获取剪贴板中的图片
qt_image = clipboard.image()
if not qt_image.isNull():
# 将Qt图像保存为临时文件
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
temp_path = temp_file.name
# 保存Qt图像为PNG文件
if qt_image.save(temp_path, 'PNG'):
# 使用OpenCV读取临时文件
self.image = cv2.imread(temp_path)
if self.image is not None:
self.hsv_image = cv2.cvtColor(self.image, cv2.COLOR_BGR2HSV)
self.update_image()
else:
QMessageBox.warning(self, "错误", "无法读取图片数据")
# 删除临时文件
try:
os.unlink(temp_path)
except:
pass
else:
QMessageBox.warning(self, "错误", "无法保存图片数据")
try:
os.unlink(temp_path)
except:
pass
else:
QMessageBox.warning(self, "错误", "剪贴板中的图片无效")
else:
QMessageBox.information(self, "提示", "剪贴板中没有图片")
def opencv_hsv_to_qt_hsv(self, h, s, v):
# OpenCV 的 H 范围是 0-179需要转换到 Qt 的 0-359
h = int(h * 2)