feat(core): 新增 Mumu12Host & LeidianHost 模拟器控制

This commit is contained in:
XcantloadX 2025-05-19 10:48:29 +08:00
parent b434278e4e
commit 6b5f972d25
12 changed files with 508 additions and 23 deletions

View File

@ -155,7 +155,7 @@ class KotoneBot:
self.backend_instance = create_custom(
adb_ip=config.backend.adb_ip,
adb_port=config.backend.adb_port,
adb_emulator_name=config.backend.adb_emulator_name,
adb_name=config.backend.adb_emulator_name,
exe_path=exe,
emulator_args=config.backend.emulator_args
)

View File

@ -1 +1,8 @@
from .device import Device
from .device import Device
from .factory import create_device, DeviceImpl
__all__ = [
'Device',
'create_device',
'DeviceImpl',
]

View File

@ -15,12 +15,29 @@ DeviceImpl = Literal['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_window
def create_device(
addr: str,
impl: DeviceImpl,
*,
connect: bool = True,
device_serial: str | None = None
) -> Device:
"""
根据指定的实现方式创建 Device 实例
:param addr: 设备地址 `127.0.0.1:5555`
仅当通过无线方式连接 Android 设备或者使用 `remote_windows` 时有效
:param impl: 实现方式
:param connect: 是否在创建时连接设备默认为 True
仅对 ADB-based 的实现方式有效
:param device_serial: 设备序列号默认为 None
若为非 None则当存在多个设备时通过该值判断是否为目标设备
仅对 ADB-based 的实现方式有效
"""
if impl in ['adb', 'adb_raw', 'uiautomator2']:
result = adb.connect(addr)
if 'cannot connect to' in result:
raise ValueError(result)
d = [d for d in adb.device_list() if d.serial == addr]
if connect:
result = adb.connect(addr)
if 'cannot connect to' in result:
raise ValueError(result)
serial = device_serial or addr
d = [d for d in adb.device_list() if d.serial == serial]
if len(d) == 0:
raise ValueError(f"Device {addr} not found")
d = d[0]

View File

@ -1,4 +1,11 @@
from .protocol import HostProtocol, Instance
from .custom import CustomInstance, create as create_custom
from .mumu12_host import Mumu12Host, Mumu12Instance
from .leidian_host import LeidianHost, LeidianInstance
__all__ = ['HostProtocol', 'Instance', 'CustomInstance', 'create_custom']
__all__ = [
'HostProtocol', 'Instance',
'CustomInstance', 'create_custom',
'Mumu12Host', 'Mumu12Instance',
'LeidianHost', 'LeidianInstance'
]

View File

@ -6,6 +6,7 @@ from typing import Optional, ParamSpec, TypeVar, TypeGuard
from typing_extensions import override
from kotonebot import logging
from kotonebot.client.device import Device
logger = logging.getLogger(__name__)
@ -55,6 +56,10 @@ class CustomInstance(Instance):
else:
return False
@override
def refresh(self):
pass
def __repr__(self) -> str:
return f'CustomInstance(#{self.id}# at "{self.exe_path}" with {self.adb_ip}:{self.adb_port})'
@ -63,8 +68,8 @@ def _type_check(ins: Instance) -> CustomInstance:
raise ValueError(f'Instance {ins} is not a CustomInstance')
return ins
def create(exe_path: str, adb_ip: str, adb_port: int, adb_emulator_name: str, emulator_args: str = "") -> CustomInstance:
return CustomInstance(exe_path, emulator_args=emulator_args, id='custom', name='Custom', adb_ip=adb_ip, adb_port=adb_port, adb_emulator_name=adb_emulator_name)
def create(exe_path: str, adb_ip: str, adb_port: int, adb_name: str, emulator_args: str = "") -> CustomInstance:
return CustomInstance(exe_path, emulator_args=emulator_args, id='custom', name='Custom', adb_ip=adb_ip, adb_port=adb_port, adb_name=adb_name)
if __name__ == '__main__':

View File

@ -0,0 +1,191 @@
import os
import subprocess
from functools import lru_cache
from typing_extensions import override
from kotonebot import logging
from kotonebot.client.device import Device
from kotonebot.util import Countdown, Interval
from .protocol import HostProtocol, Instance, copy_type
logger = logging.getLogger(__name__)
if os.name == 'nt':
from ...interop.win.reg import read_reg
else:
def read_reg(key, subkey, name, *, default=None, **kwargs):
"""Stub for read_reg on non-Windows platforms."""
return default
class LeidianInstance(Instance):
@copy_type(Instance.__init__)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._args = args
self.index: int | None = None
self.is_running: bool = False
@override
def refresh(self):
ins = LeidianHost.query(id=self.id)
assert isinstance(ins, LeidianInstance), f'Expected LeidianInstance, got {type(ins)}'
if ins is not None:
self.adb_port = ins.adb_port
self.adb_ip = ins.adb_ip
self.adb_name = ins.adb_name
self.is_running = ins.is_running
logger.debug('Refreshed Leidian instance: %s', repr(ins))
@override
def start(self):
if self.running():
logger.warning('Instance is already running.')
return
logger.info('Starting Leidian instance %s', self)
LeidianHost._invoke_manager(['launch', '--index', str(self.index)])
self.refresh()
@override
def stop(self):
if not self.running():
logger.warning('Instance is not running.')
return
logger.info('Stopping Leidian instance id=%s name=%s...', self.id, self.name)
LeidianHost._invoke_manager(['quit', '--index', str(self.index)])
self.refresh()
@override
def wait_available(self, timeout: float = 180):
cd = Countdown(timeout)
it = Interval(5)
while not cd.expired() and not self.running():
it.wait()
if not self.running():
raise TimeoutError(f'Leidian instance "{self.name}" is not available.')
@override
def running(self) -> bool:
result = LeidianHost._invoke_manager(['isrunning', '--index', str(self.index)])
return result.strip() == 'running'
@override
def create_device(self) -> Device:
raise NotImplementedError('CustomInstance does not support create_device.')
class LeidianHost(HostProtocol):
@staticmethod
@lru_cache(maxsize=1)
def _read_install_path() -> str | None:
"""
从注册表中读取雷电模拟器的安装路径
:return: 安装路径如果未找到则返回 None
"""
if os.name != 'nt':
return None
try:
icon_path = read_reg('HKCU', r'Software\leidian\LDPlayer9', 'DisplayIcon', default=None)
if icon_path and isinstance(icon_path, str):
icon_path = icon_path.replace('"', '')
path = os.path.dirname(icon_path)
logger.debug('Leidian installation path (from DisplayIcon): %s', path)
return path
install_dir = read_reg('HKCU', r'Software\leidian\LDPlayer9', 'InstallDir', default=None)
if install_dir and isinstance(install_dir, str):
install_dir = install_dir.replace('"', '')
logger.debug('Leidian installation path (from InstallDir): %s', install_dir)
return install_dir
except Exception as e:
logger.error(f'Failed to read Leidian installation path from registry: {e}')
return None
@staticmethod
def _invoke_manager(args: list[str]) -> str:
"""
调用 ldconsole.exe
参考文档https://www.ldmnq.com/forum/30.html以及命令行帮助
另外还有个 ld.exe封装了 adb.exe可以直接执行 adb 命令https://www.ldmnq.com/forum/9178.html
:param args: 命令行参数列表
:return: 命令执行的输出
"""
install_path = LeidianHost._read_install_path()
if install_path is None:
raise RuntimeError('Leidian is not installed.')
manager_path = os.path.join(install_path, 'ldconsole.exe')
logger.debug('ldconsole execute: %s', repr(args))
output = subprocess.run(
[manager_path] + args,
capture_output=True,
text=True,
# encoding='utf-8', # 居然不是 utf-8 编码
# https://stackoverflow.com/questions/6011235/run-a-program-from-python-and-have-it-continue-to-run-after-the-script-is-kille
creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP
)
if output.returncode != 0:
raise RuntimeError(f'Failed to invoke ldconsole: {output.stderr}')
return output.stdout
@staticmethod
def installed() -> bool:
return LeidianHost._read_install_path() is not None
@staticmethod
def list() -> list[Instance]:
output = LeidianHost._invoke_manager(['list2'])
instances = []
# 解析 list2 命令的输出
# 格式: 索引,标题,顶层窗口句柄,绑定窗口句柄,是否进入android,进程PID,VBox进程PID
for line in output.strip().split('\n'):
if not line:
continue
parts = line.split(',')
if len(parts) < 5:
logger.warning(f'Invalid list2 output line: {line}')
continue
index = parts[0]
name = parts[1]
is_android_started = parts[4] == '1'
# 端口号规则 https://help.ldmnq.com/docs/LD9adbserver#a67730c2e7e2e0400d40bcab37d0e0cf
adb_port = 5554 + (int(index) * 2)
instance = LeidianInstance(
id=index,
name=name,
adb_port=adb_port,
adb_ip='127.0.0.1',
adb_name=f'emulator-{adb_port}'
)
instance.index = int(index)
instance.is_running = is_android_started
logger.debug('Leidian instance: %s', repr(instance))
instances.append(instance)
return instances
@staticmethod
def query(*, id: str) -> Instance | None:
instances = LeidianHost.list()
for instance in instances:
if instance.id == id:
return instance
return None
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')
print(LeidianHost._read_install_path())
print(LeidianHost.installed())
print(LeidianHost.list())
print(ins:=LeidianHost.query(id='0'))
assert isinstance(ins, LeidianInstance)
ins.start()
ins.wait_available()
print('status', ins.running(), ins.adb_port, ins.adb_ip)
# ins.stop()
# print('status', ins.running(), ins.adb_port, ins.adb_ip)

View File

@ -0,0 +1,168 @@
import os
import subprocess
from functools import lru_cache
from typing import Any
from typing_extensions import override
from kotonebot import logging
from kotonebot.util import Countdown, Interval
from .protocol import HostProtocol, Instance, copy_type
logger = logging.getLogger(__name__)
if os.name == 'nt':
from ...interop.win.reg import read_reg
else:
def read_reg(key, subkey, name, *, default=None, **kwargs):
"""Stub for read_reg on non-Windows platforms."""
return default
class Mumu12Instance(Instance):
@copy_type(Instance.__init__)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._args = args
self.index: int | None = None
self.is_android_started: bool = False
@override
def refresh(self):
ins = Mumu12Host.query(id=self.id)
assert isinstance(ins, Mumu12Instance), f'Expected Mumu12Instance, got {type(ins)}'
if ins is not None:
self.adb_port = ins.adb_port
self.adb_ip = ins.adb_ip
self.adb_name = ins.adb_name
self.is_android_started = ins.is_android_started
logger.debug('Refreshed MuMu12 instance: %s', repr(ins))
@override
def start(self):
if self.running():
logger.warning('Instance is already running.')
return
logger.info('Starting MuMu12 instance %s', self)
Mumu12Host._invoke_manager(['control', '-v', self.id, 'launch'])
self.refresh()
@override
def stop(self):
if not self.running():
logger.warning('Instance is not running.')
return
logger.info('Stopping MuMu12 instance id=%s name=%s...', self.id, self.name)
Mumu12Host._invoke_manager(['control', '-v', self.id, 'shutdown'])
self.refresh()
@override
def wait_available(self, timeout: float = 180):
cd = Countdown(timeout)
it = Interval(5)
while not cd.expired() and not self.running():
it.wait()
self.refresh()
if not self.running():
raise TimeoutError(f'MuMu12 instance "{self.name}" is not available.')
@override
def running(self) -> bool:
return self.is_android_started
class Mumu12Host(HostProtocol):
@staticmethod
@lru_cache(maxsize=1)
def _read_install_path() -> str | None:
"""
Reads the installation path (DisplayIcon) of MuMu Player 12 from the registry.
:return: The path to the display icon if found, otherwise None.
"""
if os.name != 'nt':
return None
uninstall_subkeys = [
r'SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\MuMuPlayer-12.0',
r'SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\MuMuPlayerGlobal-12.0'
]
for subkey in uninstall_subkeys:
icon_path = read_reg('HKLM', subkey, 'DisplayIcon', default=None)
if icon_path and isinstance(icon_path, str):
icon_path = icon_path.replace('"', '')
path = os.path.dirname(icon_path)
logger.debug('MuMu Player 12 installation path: %s', path)
return path
return None
@staticmethod
def _invoke_manager(args: list[str]) -> str:
"""
调用 MuMuManager.exe
:param args: 命令行参数列表
:return: 命令执行的输出
"""
install_path = Mumu12Host._read_install_path()
if install_path is None:
raise RuntimeError('MuMu Player 12 is not installed.')
manager_path = os.path.join(install_path, 'MuMuManager.exe')
logger.debug('MuMuManager execute: %s', repr(args))
output = subprocess.run(
[manager_path] + args,
capture_output=True,
text=True,
encoding='utf-8',
# https://stackoverflow.com/questions/6011235/run-a-program-from-python-and-have-it-continue-to-run-after-the-script-is-kille
creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP
)
if output.returncode != 0:
raise RuntimeError(f'Failed to invoke MuMuManager: {output.stderr}')
return output.stdout
@staticmethod
def installed() -> bool:
return Mumu12Host._read_install_path() is not None
@staticmethod
def list() -> list[Instance]:
output = Mumu12Host._invoke_manager(['info', '-v', 'all'])
import json
try:
data: dict[str, dict[str, Any]] = json.loads(output) # Assuming the output is a JSON array
instances = []
for index, instance_data in data.items():
instance = Mumu12Instance(
id=index,
name=instance_data['name'],
adb_port=instance_data.get('adb_port'),
adb_ip=instance_data.get('adb_host_ip', '127.0.0.1'),
adb_name=None
)
instance.index = int(index)
instance.is_android_started = instance_data.get('is_android_started', False)
logger.debug('Mumu12 instance: %s', repr(instance))
instances.append(instance)
return instances
except json.JSONDecodeError as e:
raise RuntimeError(f'Failed to parse output: {e}')
@staticmethod
def query(*, id: str) -> Instance | None:
instances = Mumu12Host.list()
for instance in instances:
if instance.id == id:
return instance
return None
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s] [%(lineno)d] %(message)s')
print(Mumu12Host._read_install_path())
print(Mumu12Host.installed())
print(Mumu12Host.list())
print(ins:=Mumu12Host.query(id='2'))
assert isinstance(ins, Mumu12Instance)
ins.start()
ins.wait_available()
print('status', ins.running(), ins.adb_port, ins.adb_ip)
ins.stop()
print('status', ins.running(), ins.adb_port, ins.adb_ip)

View File

@ -1,15 +1,21 @@
import time
import socket
from typing import Protocol
from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing_extensions import ParamSpec, Concatenate
from typing import Callable, TypeVar, Generic, Protocol, runtime_checkable, Type, Any
from adbutils import adb, AdbTimeout, AdbError
from adbutils._device import AdbDevice
from kotonebot import logging
from kotonebot.client import Device, create_device, DeviceImpl
from kotonebot.util import Countdown, Interval
logger = logging.getLogger(__name__)
# https://github.com/python/typing/issues/769#issuecomment-903760354
_T = TypeVar("_T")
def copy_type(_: _T) -> Callable[[Any], _T]:
return lambda x: x
def tcp_ping(host: str, port: int, timeout: float = 1.0) -> bool:
"""
@ -30,34 +36,75 @@ def tcp_ping(host: str, port: int, timeout: float = 1.0) -> bool:
return False
@dataclass
class Instance:
id: str
name: str
adb_port: int
adb_ip: str = '127.0.0.1'
adb_emulator_name: str = 'emulator-5554'
class Instance(ABC):
def __init__(self,
id: str,
name: str,
adb_port: int | None = None,
adb_ip: str = '127.0.0.1',
adb_name: str | None = None
):
self.id: str = id
self.name: str = name
self.adb_port: int | None = adb_port
self.adb_ip: str = adb_ip
self.adb_name: str | None = adb_name
def require_adb_port(self) -> int:
if self.adb_port is None:
raise ValueError("ADB port is not set and is required.")
return self.adb_port
@abstractmethod
def refresh(self):
"""
刷新实例信息 ADB 端口号等
"""
raise NotImplementedError()
@abstractmethod
def start(self):
"""
启动模拟器实例
"""
raise NotImplementedError()
@abstractmethod
def stop(self):
"""
停止模拟器实例
"""
raise NotImplementedError()
@abstractmethod
def running(self) -> bool:
raise NotImplementedError()
def create_device(self, impl: DeviceImpl) -> Device:
"""
创建 Device 实例可用于控制模拟器系统
:return: Device 实例
"""
if self.adb_port is None:
raise ValueError("ADB port is not set and is required.")
return create_device(
addr=f'{self.adb_ip}:{self.adb_port}',
impl=impl,
device_serial=self.adb_name,
)
def wait_available(self, timeout: float = 180):
logger.info('Starting to wait for emulator %s(127.0.0.1:%d) to be available...', self.name, self.adb_port)
state = 0
port = self.adb_port
emulator_name = self.adb_emulator_name
port = self.require_adb_port()
emulator_name = self.adb_name
cd = Countdown(timeout)
it = Interval(1)
d: AdbDevice | None = None
while True:
if cd.expired():
raise TimeoutError(f"Emulator {self.name} is not available.")
raise TimeoutError(f'Emulator "{self.name}" is not available.')
it.wait()
try:
match state:
@ -121,12 +168,18 @@ class Instance:
time.sleep(1)
logger.info('Emulator %s(127.0.0.1:%d) now is available.', self.name, self.adb_port)
def __repr__(self) -> str:
return f'{self.__class__.__name__}(name="{self.name}", id="{self.id}", adb="{self.adb_ip}:{self.adb_port}"({self.adb_name}))'
class HostProtocol(Protocol):
@staticmethod
def installed() -> bool: ...
@staticmethod
def list() -> list[Instance]: ...
@staticmethod
def query(*, id: str) -> Instance | None: ...
if __name__ == '__main__':

View File

@ -32,7 +32,7 @@ class AdbImpl(Commandable, Touchable, Screenshotable):
if result_text == '':
logger.error("No current package found")
return None
_, activity, _, pid = result_text.split(' ')
_, activity, *_ = result_text.split(' ')
package = activity.split('/')[0]
return package

View File

@ -0,0 +1,37 @@
import winreg
from typing import Any, Literal
RegKey = Literal["HKLM", "HKCU", "HKCR", "HKU", "HKCC"]
def read_reg(key: RegKey, subkey: str, name: str, *, default: Any = None) -> Any:
"""
读取注册表项的值
:param key: 注册表键例如 "HKLM" (HKEY_LOCAL_MACHINE), "HKCU" (HKEY_CURRENT_USER)
:param subkey: 注册表子键的路径
:param name: 要读取的值的名称
:param default: 如果注册表项不存在时返回的默认值
:return: 注册表项的值如果不存在则返回默认值
"""
try:
hkey = {
"HKLM": winreg.HKEY_LOCAL_MACHINE,
"HKCU": winreg.HKEY_CURRENT_USER,
"HKCR": winreg.HKEY_CLASSES_ROOT,
"HKU": winreg.HKEY_USERS,
"HKCC": winreg.HKEY_CURRENT_CONFIG,
}[key]
except KeyError:
raise ValueError(f"Invalid key: {key}")
try:
with winreg.OpenKey(hkey, subkey) as key_handle:
value, _ = winreg.QueryValueEx(key_handle, name)
return value
except FileNotFoundError:
return default
except OSError as e:
if e.winerror == 2: # 注册表项不存在
return default
else:
raise # 其他 OSError 异常,例如权限问题,重新抛出

View File

@ -159,7 +159,7 @@ def resume_produce():
current_week = int(weeks[0])
break
retry_count += 1
logger.warning(f'Failed to detect weeks. Retrying... ({retry_count}/{max_retries})')
logger.warning(f'Failed to detect weeks. week_text="{week_text}". Retrying... ({retry_count}/{max_retries})')
sleep(0.5)
device.screenshot()