feat(core): 新增 screencap raw 截图方法

This commit is contained in:
XcantloadX 2025-02-14 17:33:11 +08:00
parent 0077062263
commit cb0e6decae
5 changed files with 127 additions and 13 deletions

View File

@ -691,7 +691,7 @@ class Context(Generic[T]):
ip = self.config.current.backend.adb_ip
port = self.config.current.backend.adb_port
# TODO: 处理链接失败情况
self.__device = ContextDevice(create_device(f'{ip}:{port}', 'uiautomator2'))
self.__device = ContextDevice(create_device(f'{ip}:{port}', 'adb_raw'))
def inject(
self,

View File

@ -351,6 +351,7 @@ class Device:
if __name__ == "__main__":
from kotonebot.client.implements.adb import AdbImpl
from kotonebot.client.implements.adb_raw import AdbRawImpl
from .implements.uiautomator2 import UiAutomator2Impl
print("server version:", adb.server_version())
adb.connect("127.0.0.1:5555")
@ -358,14 +359,12 @@ if __name__ == "__main__":
d = adb.device_list()[-1]
d.shell("dumpsys activity top | grep ACTIVITY | tail -n 1")
dd = Device(d)
adb_imp = AdbImpl(dd)
adb_imp = AdbRawImpl(dd)
dd._command = adb_imp
dd._touch = adb_imp
# dd._screenshot = adb_imp
dd._screenshot = adb_imp
# dd._screenshot = MinicapScreenshotImpl(dd)
dd._screenshot = UiAutomator2Impl(dd)
# dd.launch_app("com.android.settings")
dd.adb = d
# dd._screenshot = UiAutomator2Impl(dd)
# 实时展示画面
import cv2
@ -389,8 +388,8 @@ if __name__ == "__main__":
# 在图像上绘制信息
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(img, current_time, (10, 30), font, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
cv2.putText(img, fps_text, (10, 60), font, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
cv2.putText(img, current_time, (10, 30), font, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.putText(img, fps_text, (10, 60), font, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.imshow("screen", img)
cv2.waitKey(1)

View File

@ -3,11 +3,12 @@ from typing import Literal
from .device import Device
from .implements.adb import AdbImpl
from .implements.adb_raw import AdbRawImpl
from .implements.uiautomator2 import UiAutomator2Impl
from adbutils import adb
DeviceImpl = Literal['adb', 'uiautomator2']
DeviceImpl = Literal['adb', 'adb_raw', 'uiautomator2']
def create_device(
addr: str,
@ -23,6 +24,10 @@ def create_device(
device._command = AdbImpl(device)
device._touch = AdbImpl(device)
device._screenshot = AdbImpl(device)
elif impl == 'adb_raw':
device._command = AdbRawImpl(device)
device._touch = AdbRawImpl(device)
device._screenshot = AdbRawImpl(device)
elif impl == 'uiautomator2':
device._command = UiAutomator2Impl(device)
device._touch = UiAutomator2Impl(device)

View File

@ -0,0 +1,104 @@
import os
import time
import subprocess
import struct
from threading import Thread
from functools import cached_property
from typing_extensions import override
import cv2
import numpy as np
from cv2.typing import MatLike
from adbutils._utils import adb_path
from .adb import AdbImpl
from ..device import Device
from kotonebot import logging
logger = logging.getLogger(__name__)
WAIT_TIMEOUT = 10
SCRIPT = """#!/bin/sh
while true; do
screencap
sleep 0.3
done
"""
class AdbRawImpl(AdbImpl):
def __init__(self, device: Device):
super().__init__(device)
self.__worker: Thread | None = None
self.__process: subprocess.Popen | None = None
self.__data: MatLike | None = None
def __worker_thread(self) -> None:
with open('screenshot.sh', 'w', encoding='utf-8', newline='\n') as f:
f.write(SCRIPT)
self.adb.push('screenshot.sh', '/data/local/tmp/screenshot.sh')
self.adb.shell(f'chmod 755 /data/local/tmp/screenshot.sh')
os.remove('screenshot.sh')
cmd = fr'{adb_path()} -s {self.adb.serial} exec-out "sh /data/local/tmp/screenshot.sh"'
self.__process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
while self.__process.poll() is None:
if self.__process.stdout is None:
logger.error("Failed to get stdout from process")
continue
# 解析 header
# https://stackoverflow.com/questions/22034959/what-format-does-adb-screencap-sdcard-screenshot-raw-produce-without-p-f
if self.__api_level >= 26:
metadata = self.__process.stdout.read(16)
w, h, p, c = struct.unpack('<IIII', metadata)
# w=width, h=height, p=pixel_format, c=color_space
# 详见https://android.googlesource.com/platform/frameworks/base/+/26a2b97dbe48ee45e9ae70110714048f2f360f97/cmds/screencap/screencap.cpp#209
else:
metadata = self.__process.stdout.read(12)
w, h, p = struct.unpack('<III', metadata)
if p == 1: # PixelFormat.RGBA_8888
channel = 4
else:
raise ValueError(f"Unsupported pixel format: {p}")
data_size = w * h * channel
if (data_size < 100 * 100 * 4) or (data_size > 3000 * 3000 * 4):
raise ValueError(f"Invaild data_size: {w}x{h}.")
# 读取图像数据
# logger.verbose(f"receiving image data: {w}x{h} {data_size} bytes")
image_data = self.__process.stdout.read(data_size)
if not isinstance(image_data, bytes) or len(image_data) != data_size:
logger.error(f"Failed to read image data, expected {data_size} bytes but got {len(image_data) if isinstance(image_data, bytes) else 'non-bytes'}")
continue
np_data = np.frombuffer(image_data, np.uint8)
np_data = np_data.reshape(h, w, channel)
self.__data = cv2.cvtColor(np_data, cv2.COLOR_RGBA2BGR)
@cached_property
def __api_level(self) -> int:
try:
output = self.adb.shell("getprop ro.build.version.sdk")
assert isinstance(output, str)
return int(output.strip())
except Exception as e:
logger.error(f"Failed to get API level: {e}")
return 0
@override
def screenshot(self) -> MatLike:
if not self.__worker:
self.__worker = Thread(target=self.__worker_thread, daemon=True)
self.__worker.start()
start_time = time.time()
while self.__data is None:
time.sleep(0.01)
if time.time() - start_time > WAIT_TIMEOUT:
raise TimeoutError("Failed to get screenshot from device.")
logger.verbose(f"adb raw screenshot wait time: {time.time() - start_time:.4f}s")
data = self.__data
self.__data = None
return data

View File

@ -1,13 +1,16 @@
import time
from typing import Literal
from ..device import Device
from ..protocol import Screenshotable, Commandable, Touchable
import numpy as np
import uiautomator2 as u2
from cv2.typing import MatLike
from kotonebot import logging
from ..device import Device
from ..protocol import Screenshotable, Commandable, Touchable
logger = logging.getLogger(__name__)
SCREENSHOT_INTERVAL = 0.2
class UiAutomator2Impl(Screenshotable, Commandable, Touchable):
@ -23,8 +26,10 @@ class UiAutomator2Impl(Screenshotable, Commandable, Touchable):
from kotonebot import sleep
delta = time.time() - self.__last_screenshot_time
if delta < SCREENSHOT_INTERVAL:
sleep(SCREENSHOT_INTERVAL - delta)
time.sleep(SCREENSHOT_INTERVAL - delta)
start_time = time.time()
image = self.u2_client.screenshot(format='opencv')
logger.verbose(f'uiautomator2 screenshot: {time.time() - start_time}s')
self.__last_screenshot_time = time.time()
assert isinstance(image, np.ndarray)
return image
@ -62,6 +67,7 @@ class UiAutomator2Impl(Screenshotable, Commandable, Touchable):
"""
try:
result = self.u2_client.app_current()
logger.verbose(f'uiautomator2 current_package: {result}')
return result['package']
except:
return None