Merge branch 'dev'

This commit is contained in:
XcantloadX 2025-04-26 17:04:21 +08:00
commit 622667e0e7
30 changed files with 833 additions and 199 deletions

View File

@ -18,6 +18,12 @@ default:
fetch-submodule:
git submodule update --init --remote --recursive
resource:
python tools\make_resources.py
devtool:
cd kotonebot-devtool; npm run dev
# Check and create virtual environment
env: fetch-submodule
#!{{shebang_pwsh}}

View File

Before

Width:  |  Height:  |  Size: 1.1 MiB

After

Width:  |  Height:  |  Size: 1.1 MiB

View File

Before

Width:  |  Height:  |  Size: 953 KiB

After

Width:  |  Height:  |  Size: 953 KiB

View File

@ -0,0 +1 @@
{"definitions":{"8ded6c98-85ea-4858-a66d-4fc8caecb7c5":{"name":"InPurodyuusu.ButtonIconOuting","displayName":"行动按钮图标 外出(おでかけ)","type":"template","annotationId":"8ded6c98-85ea-4858-a66d-4fc8caecb7c5","useHintRect":false},"d83f338d-dde3-494b-9bea-cae511e3517c":{"name":"InPurodyuusu.ButtonIconConsult","displayName":"行动按钮图标 咨询(相談)","type":"template","annotationId":"d83f338d-dde3-494b-9bea-cae511e3517c","useHintRect":false}},"annotations":[{"id":"8ded6c98-85ea-4858-a66d-4fc8caecb7c5","type":"rect","data":{"x1":233,"y1":962,"x2":316,"y2":1037}},{"id":"d83f338d-dde3-494b-9bea-cae511e3517c","type":"rect","data":{"x1":405,"y1":963,"x2":488,"y2":1043}}]}

View File

@ -1 +0,0 @@
{"definitions":{"8ded6c98-85ea-4858-a66d-4fc8caecb7c5":{"name":"InPurodyuusu.ButtonIconOuting","displayName":"行动按钮图标 外出(おでかけ)","type":"template","annotationId":"8ded6c98-85ea-4858-a66d-4fc8caecb7c5","useHintRect":false}},"annotations":[{"id":"8ded6c98-85ea-4858-a66d-4fc8caecb7c5","type":"rect","data":{"x1":233,"y1":962,"x2":316,"y2":1037}}]}

Binary file not shown.

After

Width:  |  Height:  |  Size: 601 KiB

View File

@ -0,0 +1 @@
{"definitions":{"23d88465-65d9-4718-8725-8dbf0a98a5a4":{"name":"InPurodyuusu.IconTitleConsult","displayName":"「相談」页面左上角图标","type":"template","annotationId":"23d88465-65d9-4718-8725-8dbf0a98a5a4","useHintRect":false},"1fb9bd7a-52f6-43c6-9b13-a05b66ecae42":{"name":"InPurodyuusu.PointConsultFirstItem","displayName":"「相談」中第一个物品位置","type":"hint-point","annotationId":"1fb9bd7a-52f6-43c6-9b13-a05b66ecae42","useHintRect":false},"9fd0753f-c607-4d49-82d1-40bda27e014f":{"name":"InPurodyuusu.ButtonEndConsult","displayName":"相談 结束按钮","type":"template","annotationId":"9fd0753f-c607-4d49-82d1-40bda27e014f","useHintRect":false}},"annotations":[{"id":"23d88465-65d9-4718-8725-8dbf0a98a5a4","type":"rect","data":{"x1":74,"y1":79,"x2":150,"y2":145}},{"id":"1fb9bd7a-52f6-43c6-9b13-a05b66ecae42","type":"point","data":{"x":123,"y":550}},{"id":"9fd0753f-c607-4d49-82d1-40bda27e014f","type":"rect","data":{"x1":587,"y1":1062,"x2":703,"y2":1109}}]}

Binary file not shown.

After

Width:  |  Height:  |  Size: 622 KiB

View File

@ -0,0 +1 @@
{"definitions":{"4096cffa-a889-4622-852e-760fc7022d93":{"name":"InPurodyuusu.ButtonIconExchange","displayName":"交换按钮的图标","type":"template","annotationId":"4096cffa-a889-4622-852e-760fc7022d93","useHintRect":false}},"annotations":[{"id":"4096cffa-a889-4622-852e-760fc7022d93","type":"rect","data":{"x1":258,"y1":1066,"x2":303,"y2":1108}}]}

Binary file not shown.

After

Width:  |  Height:  |  Size: 314 KiB

View File

@ -0,0 +1 @@
{"definitions":{"25f00ee3-8dfe-42d1-a67e-191fa5c3df4b":{"name":"InPurodyuusu.TextExchangeConfirm","displayName":"交換確認","type":"template","annotationId":"25f00ee3-8dfe-42d1-a67e-191fa5c3df4b","useHintRect":false,"description":"咨询中,购买确认对话框的标题"}},"annotations":[{"id":"25f00ee3-8dfe-42d1-a67e-191fa5c3df4b","type":"rect","data":{"x1":53,"y1":612,"x2":191,"y2":652}}]}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 418 KiB

View File

@ -0,0 +1 @@
{"definitions":{"3c8b477a-8eda-407e-9e9f-7540c8808f89":{"name":"Produce.ResumeDialogMaster","displayName":"培育再开对话框 MASTER","type":"template","annotationId":"3c8b477a-8eda-407e-9e9f-7540c8808f89","useHintRect":false}},"annotations":[{"id":"3c8b477a-8eda-407e-9e9f-7540c8808f89","type":"rect","data":{"x1":406,"y1":510,"x2":492,"y2":533}}]}

Binary file not shown.

After

Width:  |  Height:  |  Size: 474 KiB

View File

@ -207,6 +207,7 @@ class KotoneBot:
"""
self.check_backend()
init_context(config_type=self.config_type)
vars.interrupted.clear()
if by_priority:
tasks = sorted(tasks, key=lambda x: x.priority, reverse=True)
@ -249,7 +250,14 @@ class KotoneBot:
def run_all(self) -> None:
return self.run(list(task_registry.values()), by_priority=True)
def start_all(self) -> RunStatus:
def start(self, tasks: list[Task], *, by_priority: bool = True) -> RunStatus:
"""
在单独的线程中按优先级顺序运行指定的任务
:param tasks: 要运行的任务列表
:param by_priority: 是否按优先级排序
:return: 运行状态对象
"""
run_status = RunStatus(running=True)
def _on_finished():
run_status.running = False
@ -271,7 +279,14 @@ class KotoneBot:
self.events.task_status_changed += _on_task_status_changed
self.events.finished += _on_finished
thread = threading.Thread(target=self.run_all)
thread = threading.Thread(target=lambda: self.run(tasks, by_priority=by_priority))
thread.start()
return run_status
def start_all(self) -> RunStatus:
"""
在单独的线程中运行所有任务
:return: 运行状态对象
"""
return self.start(list(task_registry.values()), by_priority=True)

View File

@ -4,12 +4,13 @@ from typing import Literal
from .implements.adb import AdbImpl
from .implements.adb_raw import AdbRawImpl
from .implements.windows import WindowsImpl
from .implements.remote_windows import RemoteWindowsImpl
from .implements.uiautomator2 import UiAutomator2Impl
from .device import Device, AndroidDevice, WindowsDevice
from adbutils import adb
DeviceImpl = Literal['adb', 'adb_raw', 'uiautomator2', 'windows']
DeviceImpl = Literal['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows']
def create_device(
addr: str,
@ -40,4 +41,18 @@ def create_device(
device = WindowsDevice()
device._touch = WindowsImpl(device)
device._screenshot = WindowsImpl(device)
elif impl == 'remote_windows':
# For remote_windows, addr should be in the format 'host:port'
if ':' not in addr:
raise ValueError(f"Invalid address format for remote_windows: {addr}. Expected format: 'host:port'")
host, port_str = addr.split(':', 1)
try:
port = int(port_str)
except ValueError:
raise ValueError(f"Invalid port in address: {port_str}")
device = WindowsDevice()
remote_impl = RemoteWindowsImpl(device, host, port)
device._touch = remote_impl
device._screenshot = remote_impl
return device

View File

@ -0,0 +1,197 @@
"""
Remote Windows implementation using XML-RPC.
This module provides:
1. RemoteWindowsImpl - Client implementation that connects to a remote Windows machine
2. RemoteWindowsServer - Server implementation that exposes a WindowsImpl instance via XML-RPC
"""
import io
import base64
import logging
import xmlrpc.client
import xmlrpc.server
from typing import Literal, cast, Any, Tuple
from functools import cached_property
from threading import Thread
import cv2
import numpy as np
from cv2.typing import MatLike
from kotonebot import logging
from ..device import Device, WindowsDevice
from ..protocol import Touchable, Screenshotable
from .windows import WindowsImpl
logger = logging.getLogger(__name__)
def _encode_image(image: MatLike) -> str:
"""Encode an image as a base64 string."""
success, buffer = cv2.imencode('.png', image)
if not success:
raise RuntimeError("Failed to encode image")
return base64.b64encode(buffer.tobytes()).decode('ascii')
def _decode_image(encoded_image: str) -> MatLike:
"""Decode a base64 string to an image."""
buffer = base64.b64decode(encoded_image)
image = cv2.imdecode(np.frombuffer(buffer, np.uint8), cv2.IMREAD_COLOR)
if image is None:
raise RuntimeError("Failed to decode image")
return image
class RemoteWindowsServer:
"""
XML-RPC server that exposes a WindowsImpl instance.
This class wraps a WindowsImpl instance and exposes its methods via XML-RPC.
"""
def __init__(self, host="localhost", port=8000):
"""Initialize the server with the given host and port."""
self.host = host
self.port = port
self.server = None
self.device = WindowsDevice()
self.impl = WindowsImpl(self.device)
self.device._screenshot = self.impl
self.device._touch = self.impl
def start(self):
"""Start the XML-RPC server."""
self.server = xmlrpc.server.SimpleXMLRPCServer(
(self.host, self.port),
logRequests=True,
allow_none=True
)
self.server.register_instance(self)
logger.info(f"Starting RemoteWindowsServer on {self.host}:{self.port}")
self.server.serve_forever()
def start_in_thread(self):
"""Start the XML-RPC server in a separate thread."""
thread = Thread(target=self.start, daemon=True)
thread.start()
return thread
# Screenshotable methods
def screenshot(self) -> str:
"""Take a screenshot and return it as a base64-encoded string."""
try:
image = self.impl.screenshot()
return _encode_image(image)
except Exception as e:
logger.error(f"Error taking screenshot: {e}")
raise
def get_screen_size(self) -> tuple[int, int]:
"""Get the screen size."""
return self.impl.screen_size
def detect_orientation(self) -> str | None:
"""Detect the screen orientation."""
return self.impl.detect_orientation()
# Touchable methods
def click(self, x: int, y: int) -> bool:
"""Click at the given coordinates."""
try:
self.impl.click(x, y)
return True
except Exception as e:
logger.error(f"Error clicking at ({x}, {y}): {e}")
return False
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float | None = None) -> bool:
"""Swipe from (x1, y1) to (x2, y2)."""
try:
self.impl.swipe(x1, y1, x2, y2, duration)
return True
except Exception as e:
logger.error(f"Error swiping from ({x1}, {y1}) to ({x2}, {y2}): {e}")
return False
# Other methods
def get_scale_ratio(self) -> float:
"""Get the scale ratio."""
return self.impl.scale_ratio
def ping(self) -> bool:
"""Check if the server is alive."""
return True
class RemoteWindowsImpl(Touchable, Screenshotable):
"""
Client implementation that connects to a remote Windows machine via XML-RPC.
This class implements the same interfaces as WindowsImpl but forwards all
method calls to a remote server.
"""
def __init__(self, device: Device, host="localhost", port=8000):
"""Initialize the client with the given device, host, and port."""
self.device = device
self.host = host
self.port = port
self.proxy = xmlrpc.client.ServerProxy(
f"http://{host}:{port}/",
allow_none=True
)
# Test connection
try:
if not self.proxy.ping():
raise ConnectionError(f"Failed to connect to RemoteWindowsServer at {host}:{port}")
logger.info(f"Connected to RemoteWindowsServer at {host}:{port}")
except Exception as e:
raise ConnectionError(f"Failed to connect to RemoteWindowsServer at {host}:{port}: {e}")
@cached_property
def scale_ratio(self) -> float:
"""Get the scale ratio from the remote server."""
return cast(float, self.proxy.get_scale_ratio())
@property
def screen_size(self) -> tuple[int, int]:
"""Get the screen size from the remote server."""
return cast(Tuple[int, int], self.proxy.get_screen_size())
def detect_orientation(self) -> None | Literal['portrait'] | Literal['landscape']:
"""Detect the screen orientation from the remote server."""
return cast(None | Literal['portrait'] | Literal['landscape'], self.proxy.detect_orientation())
def screenshot(self) -> MatLike:
"""Take a screenshot from the remote server."""
encoded_image = cast(str, self.proxy.screenshot())
return _decode_image(encoded_image)
def click(self, x: int, y: int) -> None:
"""Click at the given coordinates on the remote server."""
if not self.proxy.click(x, y):
raise RuntimeError(f"Failed to click at ({x}, {y})")
def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: float | None = None) -> None:
"""Swipe from (x1, y1) to (x2, y2) on the remote server."""
if not self.proxy.swipe(x1, y1, x2, y2, duration):
raise RuntimeError(f"Failed to swipe from ({x1}, {y1}) to ({x2}, {y2})")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Remote Windows XML-RPC Server")
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
parser.add_argument("--port", type=int, default=8000, help="Port to bind to")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
server = RemoteWindowsServer(args.host, args.port)
try:
server.start()
except KeyboardInterrupt:
logger.info("Server stopped by user")

View File

@ -22,9 +22,11 @@ class BackendConfig(ConfigBaseModel):
雷电模拟器需要设置正确的模拟器名否则 自动启动模拟器 功能将无法正常工作
其他功能不受影响
"""
screenshot_impl: Literal['adb', 'adb_raw', 'uiautomator2', 'windows'] = 'adb'
screenshot_impl: Literal['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows'] = 'adb'
"""
截图方法暂时推荐使用adb截图方式
如果使用 remote_windows需要在 adb_ip 中填写远程 Windows IP 地址 adb_port 中填写远程 Windows 的端口号
"""
check_emulator: bool = False
"""
@ -72,7 +74,7 @@ class UserConfig(ConfigBaseModel, Generic[T]):
class RootConfig(ConfigBaseModel, Generic[T]):
version: int = 3
version: int = 4
"""配置版本。"""
user_configs: list[UserConfig[T]] = []
"""用户配置。"""

View File

@ -20,10 +20,13 @@ class ConfigEnum(Enum):
return self.value[1]
class Priority(IntEnum):
"""
任务优先级数字越大优先级越高越先执行
"""
START_GAME = 1
DEFAULT = 0
CLAIM_MISSION_REWARD = -1
END_GAME = -2
class APShopItems(IntEnum):
PRODUCE_PT_UP = 0
@ -142,17 +145,17 @@ class DailyMoneyShopItems(IntEnum):
return "月村手毬 Luna say maybe 碎片"
case _:
assert_never(item)
@classmethod
def all(cls) -> list[tuple[str, 'DailyMoneyShopItems']]:
"""获取所有枚举值及其对应的UI显示文本"""
return [(cls.to_ui_text(item), item) for item in cls]
@classmethod
def _is_note(cls, item: 'DailyMoneyShopItems') -> bool:
"""判断是否为笔记"""
return 'Note' in item.name and not item.name.startswith('Note') and not item.name.endswith('Note')
@classmethod
def note_items(cls) -> list[tuple[str, 'DailyMoneyShopItems']]:
"""获取所有枚举值及其对应的UI显示文本"""
@ -280,6 +283,7 @@ class ProduceAction(Enum):
STUDY = 'study'
ALLOWANCE = 'allowance'
REST = 'rest'
CONSULT = 'consult'
@property
def display_name(self):
@ -292,6 +296,7 @@ class ProduceAction(Enum):
ProduceAction.STUDY: '文化课(授業)',
ProduceAction.ALLOWANCE: '活动支给(活動支給)',
ProduceAction.REST: '休息',
ProduceAction.CONSULT: '咨询(相談)',
}
return MAP[self]
@ -310,17 +315,16 @@ class RecommendCardDetectionMode(Enum):
class ProduceConfig(ConfigBaseModel):
enabled: bool = False
"""是否启用培育"""
mode: Literal['regular', 'pro'] = 'regular'
mode: Literal['regular', 'pro', 'master'] = 'regular'
"""
培育模式
进行一次 REGULAR 培育需要 ~30min进行一次 PRO 培育需要 ~1h
进行一次 REGULAR 培育需要 ~30min进行一次 PRO 培育需要 ~1h具体视设备性能而定
"""
produce_count: int = 1
"""培育的次数。"""
idols: list[str] = []
"""
要培育偶像的 IdolCardSkin.id将会按顺序循环选择培育
若未选择任何偶像则使用游戏默认选择的偶像为上次培育偶像
"""
memory_sets: list[int] = []
"""要使用的回忆编成编号,从 1 开始。将会按顺序循环选择使用。"""
@ -341,7 +345,7 @@ class ProduceConfig(ConfigBaseModel):
prefer_lesson_ap: bool = False
"""
优先 SP 课程
启用后若出现 SP 课程则会优先执行 SP 课程而不是推荐课程
若出现多个 SP 课程随机选择一个
"""
@ -357,13 +361,13 @@ class ProduceConfig(ConfigBaseModel):
]
"""
行动优先级
每一周的行动将会按这里设置的优先级执行
"""
recommend_card_detection_mode: RecommendCardDetectionMode = RecommendCardDetectionMode.NORMAL
"""
推荐卡检测模式
严格模式下识别速度会降低但识别准确率会提高
"""
use_ap_drink: bool = False
@ -403,7 +407,7 @@ class CapsuleToysConfig(ConfigBaseModel):
anomaly_capsule_toys_count: int = 0
"""非凡扭蛋机次数"""
class TraceConfig(ConfigBaseModel):
recommend_card_detection: bool = False
"""跟踪推荐卡检测"""
@ -415,12 +419,26 @@ class StartGameConfig(ConfigBaseModel):
start_through_kuyo: bool = False
"""是否通过Kuyo来启动游戏"""
game_package_name: str = 'com.bandinamcoent.idolmaster_gakuen'
game_package_name: str = 'com.bandainamcoent.idolmaster_gakuen'
"""游戏包名"""
kuyo_package_name: str = 'org.kuyo.game'
"""Kuyo包名"""
class EndGameConfig(ConfigBaseModel):
exit_kaa: bool = False
"""退出 kaa"""
kill_game: bool = False
"""关闭游戏"""
kill_dmm: bool = False
"""关闭 DMMGamePlayer"""
kill_emulator: bool = False
"""关闭模拟器"""
shutdown: bool = False
"""关闭系统"""
hibernate: bool = False
"""休眠系统"""
class BaseConfig(ConfigBaseModel):
purchase: PurchaseConfig = PurchaseConfig()
"""商店购买配置"""
@ -458,6 +476,9 @@ class BaseConfig(ConfigBaseModel):
start_game: StartGameConfig = StartGameConfig()
"""启动游戏配置"""
end_game: EndGameConfig = EndGameConfig()
"""关闭游戏配置"""
def conf() -> BaseConfig:
"""获取当前配置数据"""
@ -478,7 +499,7 @@ def upgrade_config() -> str | None:
return None
with open('config.json', 'r', encoding='utf-8') as f:
root = json.load(f)
user_configs = root['user_configs']
old_version = root['version']
messages = []
@ -496,6 +517,11 @@ def upgrade_config() -> str | None:
user_config, msg = upgrade_v2_to_v3(user_config['options'])
messages.append(msg)
version = 3
case 3:
logger.info('Upgrading config: v3 -> v4')
user_config, msg = upgrade_v3_to_v4(user_config['options'])
messages.append(msg)
version = 4
case _:
logger.info('No config upgrade needed.')
return version
@ -580,7 +606,7 @@ class PIdol(IntEnum):
紫云清夏_キミとセミブルー = 紫云清夏_BASE + 4
紫云清夏_初恋 = 紫云清夏_BASE + 5
紫云清夏_学園生活 = 紫云清夏_BASE + 6
花海佑芽_WhiteNightWhiteWish = 花海佑芽_BASE + 0
花海佑芽_学園生活 = 花海佑芽_BASE + 1
花海佑芽_Campusmode = 花海佑芽_BASE + 2
@ -614,7 +640,7 @@ class PIdol(IntEnum):
藤田ことね_学園生活 = 藤田ことね_BASE + 7
def upgrade_v1_to_v2(options: dict[str, Any]) -> tuple[dict[str, Any], str | None]:
def upgrade_v1_to_v2(options: dict[str, Any]) -> tuple[dict[str, Any], str]:
"""
v1 -> v2 变更
@ -793,7 +819,7 @@ def upgrade_v1_to_v2(options: dict[str, Any]) -> tuple[dict[str, Any], str | Non
shutil.copy('config.json', 'config.v1.json')
return options, msg
def upgrade_v2_to_v3(options: dict[str, Any]) -> tuple[dict[str, Any], str | None]:
def upgrade_v2_to_v3(options: dict[str, Any]) -> tuple[dict[str, Any], str]:
"""
v2 -> v3 变更\n
引入了游戏解包数据因此 PIdol 枚举废弃直接改用游戏内 ID
@ -891,5 +917,16 @@ def upgrade_v2_to_v3(options: dict[str, Any]) -> tuple[dict[str, Any], str | Non
shutil.copy('config.json', 'config.v2.json')
return options, msg
def upgrade_v3_to_v4(options: dict[str, Any]) -> tuple[dict[str, Any], str]:
"""
v3 -> v4 变更
自动纠正错误游戏包名
"""
shutil.copy('config.json', 'config.v3.json')
if options['start_game']['game_package_name'] == 'com.bandinamcoent.idolmaster_gakuen':
options['start_game']['game_package_name'] = 'com.bandainamcoent.idolmaster_gakuen'
logger.info('Corrected game package name to com.bandainamcoent.idolmaster_gakuen')
return options, ''
if __name__ == '__main__':
print(PurchaseConfig.model_fields['money_refresh_on'].description)

View File

@ -0,0 +1,94 @@
"""关闭游戏"""
import os
import sys
import logging
import _thread
import threading
from kotonebot.ui import user
from .common import Priority, conf
from kotonebot import task, action, config, device
logger = logging.getLogger(__name__)
@action('关闭游戏.Android', screenshot_mode='manual-inherit')
def android_close():
"""
前置条件-
结束状态游戏关闭
"""
logger.info("Closing game")
if device.current_package() == conf().start_game.game_package_name:
logger.info("Force stopping game")
device.adb.shell(f"am force-stop {conf().start_game.game_package_name}")
logger.info("Game closed successfully")
@action('关闭游戏.Windows', screenshot_mode='manual-inherit')
def windows_close():
"""
前置条件-
结束状态游戏关闭
"""
logger.info("Closing game")
os.system('taskkill /f /im gakumas.exe')
logger.info("Game closed successfully")
@task('关闭游戏', priority=Priority.END_GAME)
def end_game():
"""
游戏结束时执行的任务
"""
# 关闭游戏
if conf().end_game.kill_game:
if device.platform == 'android':
android_close()
elif device.platform == 'windows':
windows_close()
else:
raise ValueError(f'Unsupported platform: {device.platform}')
# 关闭 DMM
if conf().end_game.kill_dmm:
logger.info("Closing DMM")
os.system('taskkill /f /im DMMGamePlayer.exe')
logger.info("DMM closed successfully")
# 关闭模拟器
if conf().end_game.kill_emulator:
emulator_path = config.current.backend.emulator_path
if emulator_path is None:
logger.warning("Emulator path is not set. Skipping")
user.info("「关闭模拟器」功能需要配置「模拟器 exe 文件路径」。")
else:
exe_name = os.path.basename(emulator_path)
os.system(f"taskkill /f /im {exe_name}")
logger.info("Emulator closed")
# TODO: 实现关闭模拟器
# 关机
if conf().end_game.shutdown:
logger.info("Shutting down system")
os.system('shutdown /s /t 60')
logger.info("System will shut down in 60 seconds")
# 休眠
if conf().end_game.hibernate:
logger.info("Hibernating system")
os.system('shutdown /h')
# 退出 kaa
if conf().end_game.exit_kaa:
logger.info("Exiting kaa")
# kaa 不在主线程中运行,一般是以 GUI 运行
if not threading.main_thread() is threading.current_thread():
_thread.interrupt_main()
sys.exit(0)
logger.info("Game ended successfully")
if __name__ == '__main__':
conf().end_game.kill_game = True
conf().end_game.kill_dmm = True
conf().end_game.kill_emulator = True
end_game()

View File

@ -2,6 +2,7 @@ import sys
import logging
import argparse
import importlib.metadata
import runpy
from .kaa import Kaa
from kotonebot.backend.context import tasks_from_id, task_registry
@ -27,6 +28,11 @@ invoke_psr.add_argument('task_ids', nargs='*', help='Tasks to invoke')
# task list 子命令
list_psr = task_subparsers.add_parser('list', help='List all available tasks')
# remote-server 子命令
remote_server_psr = subparsers.add_parser('remote-server', help='Start the remote Windows server')
remote_server_psr.add_argument('--host', default='0.0.0.0', help='Host to bind to')
remote_server_psr.add_argument('--port', type=int, default=8000, help='Port to bind to')
_kaa: Kaa | None = None
def kaa() -> Kaa:
global _kaa
@ -61,6 +67,17 @@ def task_list() -> int:
print(f' * {task.id}: {task.name}\n {task.description.strip()}')
return 0
def remote_server() -> int:
args = psr.parse_args()
try:
# 使用runpy运行remote_windows.py模块
sys.argv = ['remote_windows.py', f'--host={args.host}', f'--port={args.port}']
runpy.run_module('kotonebot.client.implements.remote_windows', run_name='__main__')
return 0
except Exception as e:
print(f'Error starting remote server: {e}')
return -1
def main():
args = psr.parse_args()
if args.subcommands == 'task':
@ -68,6 +85,8 @@ def main():
sys.exit(task_invoke())
elif args.task_command == 'list':
sys.exit(task_list())
elif args.subcommands == 'remote-server':
sys.exit(remote_server())
elif args.subcommands is None:
kaa().set_log_level(logging.DEBUG)
from .gr import main as gr_main

View File

@ -17,7 +17,7 @@ from kotonebot.tasks.common import (
BaseConfig, APShopItems, CapsuleToysConfig, ClubRewardConfig, PurchaseConfig, ActivityFundsConfig,
PresentsConfig, AssignmentConfig, ContestConfig, ProduceConfig,
MissionRewardConfig, DailyMoneyShopItems, ProduceAction,
RecommendCardDetectionMode, TraceConfig, StartGameConfig, UpgradeSupportCardConfig,
RecommendCardDetectionMode, TraceConfig, StartGameConfig, EndGameConfig, UpgradeSupportCardConfig,
)
logger = logging.getLogger(__name__)
@ -34,7 +34,7 @@ def _save_bug_report(
"""
from kotonebot import device
from kotonebot.backend.context import ContextStackVars
# 确保目录存在
os.makedirs('logs', exist_ok=True)
os.makedirs('reports', exist_ok=True)
@ -86,7 +86,7 @@ def _save_bug_report(
# 写出版本号
zipf.writestr('version.txt', version)
# 上传报告
from kotonebot.ui.file_host.sensio import upload
yield "### 上传报告..."
@ -111,10 +111,11 @@ def _save_bug_report(
class KotoneBotUI:
def __init__(self, kaa: Kaa) -> None:
self.is_running: bool = False
self.single_task_running: bool = False
self._kaa = kaa
self._load_config()
self._setup_kaa()
def _setup_kaa(self) -> None:
from kotonebot.backend.debug.vars import debug, clear_saved
@ -131,40 +132,40 @@ class KotoneBotUI:
"""导出 dumps 文件夹为 zip 文件"""
if not os.path.exists('dumps'):
return "dumps 文件夹不存在"
timestamp = datetime.now().strftime('%y-%m-%d-%H-%M-%S')
zip_filename = f'dumps-{timestamp}.zip'
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED, compresslevel=9) as zipf:
for root, dirs, files in os.walk('dumps'):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, 'dumps')
zipf.write(file_path, arcname)
return f"已导出到 {zip_filename}"
def export_logs(self) -> str:
"""导出 logs 文件夹为 zip 文件"""
if not os.path.exists('logs'):
return "logs 文件夹不存在"
timestamp = datetime.now().strftime('%y-%m-%d-%H-%M-%S')
zip_filename = f'logs-{timestamp}.zip'
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED, compresslevel=9) as zipf:
for root, dirs, files in os.walk('logs'):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, 'logs')
zipf.write(file_path, arcname)
return f"已导出到 {zip_filename}"
def get_button_status(self) -> str:
if not hasattr(self, 'run_status'):
return "启动"
if not self.run_status.running:
self.is_running = False
return "启动"
@ -176,7 +177,7 @@ class KotoneBotUI:
for task_name, task in task_registry.items():
status_list.append([task.name, "等待中"])
return status_list
for task_status in self.run_status.tasks:
status_text = {
'pending': '等待中',
@ -192,7 +193,7 @@ class KotoneBotUI:
if not self.is_running:
return self.start_run()
return self.stop_run()
def start_run(self) -> Tuple[str, List[List[str]]]:
self.is_running = True
self.run_status = self._kaa.start_all()
@ -204,6 +205,31 @@ class KotoneBotUI:
self.run_status.interrupt()
return "启动", self.update_task_status()
def start_single_task(self, task_name: str) -> Tuple[str, str]:
if not task_name:
gr.Warning("请先选择一个任务")
return "执行任务", ""
task = None
for name, t in task_registry.items():
if name == task_name:
task = t
break
if task is None:
gr.Warning(f"任务 {task_name} 未找到")
return "执行任务", ""
gr.Info(f"任务 {task_name} 开始执行")
self.single_task_running = True
self.run_status = self._kaa.start([task])
return "停止任务", f"正在执行任务: {task_name}"
def stop_single_task(self) -> Tuple[str, str]:
self.single_task_running = False
if hasattr(self, 'run_status') and self._kaa:
self.run_status.interrupt()
gr.Info("任务已停止")
return "执行任务", "任务已停止"
def save_settings(
self,
adb_ip: str,
@ -261,6 +287,13 @@ class KotoneBotUI:
start_through_kuyo: bool,
game_package_name: str,
kuyo_package_name: str,
# end game
exit_kaa: bool,
kill_game: bool,
kill_dmm: bool,
kill_emulator: bool,
shutdown: bool,
hibernate: bool,
) -> str:
ap_items_enum: List[Literal[0, 1, 2, 3]] = []
ap_items_map: Dict[str, APShopItems] = {
@ -272,7 +305,7 @@ class KotoneBotUI:
for item in ap_items:
if item in ap_items_map:
ap_items_enum.append(ap_items_map[item].value) # type: ignore
self.current_config.backend.adb_ip = adb_ip
self.current_config.backend.adb_port = adb_port
self.current_config.backend.adb_emulator_name = adb_emulator_name
@ -280,7 +313,7 @@ class KotoneBotUI:
self.current_config.keep_screenshots = keep_screenshots
self.current_config.backend.check_emulator = check_emulator
self.current_config.backend.emulator_path = emulator_path
options = BaseConfig(
purchase=PurchaseConfig(
enabled=purchase_enabled,
@ -347,13 +380,21 @@ class KotoneBotUI:
game_package_name=game_package_name,
kuyo_package_name=kuyo_package_name
),
end_game=EndGameConfig(
exit_kaa=exit_kaa,
kill_game=kill_game,
kill_dmm=kill_dmm,
kill_emulator=kill_emulator,
shutdown=shutdown,
hibernate=hibernate
),
trace=TraceConfig(
recommend_card_detection=trace_recommend_card_detection
)
)
self.current_config.options = options
try:
save_config(self.config, "config.json")
gr.Success("设置已保存,请重启程序!")
@ -365,23 +406,23 @@ class KotoneBotUI:
def _create_status_tab(self) -> None:
with gr.Tab("状态"):
gr.Markdown("## 状态")
with gr.Row():
run_btn = gr.Button("启动", scale=1)
if self._kaa.upgrade_msg:
gr.Markdown('### 配置升级报告')
gr.Markdown(self._kaa.upgrade_msg)
gr.Markdown('脚本报错或者卡住?点击"日志"选项卡中的"一键导出报告"可以快速反馈!')
task_status = gr.Dataframe(
headers=["任务", "状态"],
value=self.update_task_status(),
label="任务状态"
)
def on_run_click(evt: gr.EventData) -> Tuple[str, List[List[str]]]:
return self.toggle_run()
run_btn.click(
fn=on_run_click,
outputs=[run_btn, task_status]
@ -400,7 +441,7 @@ class KotoneBotUI:
def _create_task_tab(self) -> None:
with gr.Tab("任务"):
gr.Markdown("## 执行任务")
# 创建任务选择下拉框
task_choices = [task.name for task in task_registry.values()]
task_dropdown = gr.Dropdown(
@ -410,32 +451,67 @@ class KotoneBotUI:
type="value",
value=None
)
# 创建执行按钮
execute_btn = gr.Button("执行任务")
task_result = gr.Markdown("")
# TODO: 实现任务执行逻辑
def execute_single_task(task_name: str) -> str:
if not task_name:
gr.Warning("请先选择一个任务")
def toggle_single_task(task_name: str) -> Tuple[str, str]:
if self.single_task_running:
return self.stop_single_task()
else:
return self.start_single_task(task_name)
def get_task_button_status() -> str:
if not hasattr(self, 'run_status') or not self.run_status.running:
self.single_task_running = False
return "执行任务"
return "停止任务"
def get_single_task_status() -> str:
if not hasattr(self, 'run_status'):
return ""
task = None
for name, task in task_registry.items():
if name == task_name:
task = task
break
if task is None:
gr.Warning(f"任务 {task_name} 未找到")
return ""
gr.Info(f"任务 {task_name} 开始执行。执行结束前,请勿重复点击执行。")
self._kaa.run([task])
gr.Success(f"任务 {task_name} 执行完毕")
if not self.run_status.running and self.single_task_running:
# 任务已结束但状态未更新
self.single_task_running = False
# 检查任务状态
for task_status in self.run_status.tasks:
status = task_status.status
task_name = task_status.task.name
if status == 'finished':
return f"任务 {task_name} 已完成"
elif status == 'error':
return f"任务 {task_name} 出错"
elif status == 'cancelled':
return f"任务 {task_name} 已取消"
return "任务已结束"
if self.single_task_running:
for task_status in self.run_status.tasks:
if task_status.status == 'running':
return f"正在执行任务: {task_status.task.name}"
return "正在准备执行任务..."
return ""
execute_btn.click(
fn=execute_single_task,
fn=toggle_single_task,
inputs=[task_dropdown],
outputs=[execute_btn, task_result]
)
# 添加定时器更新按钮状态和任务状态
gr.Timer(1.0).tick(
fn=get_task_button_status,
outputs=[execute_btn]
)
gr.Timer(1.0).tick(
fn=get_single_task_status,
outputs=[task_result]
)
@ -458,7 +534,7 @@ class KotoneBotUI:
interactive=True
)
screenshot_impl = gr.Dropdown(
choices=['adb', 'adb_raw', 'uiautomator2', 'windows'],
choices=['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows'],
value=self.current_config.backend.screenshot_impl,
label="截图方法",
info=BackendConfig.model_fields['screenshot_impl'].description,
@ -510,7 +586,7 @@ class KotoneBotUI:
value=self.current_config.options.purchase.money_enabled,
info=PurchaseConfig.model_fields['money_enabled'].description
)
# 添加金币商店商品选择
money_items = gr.Dropdown(
multiselect=True,
@ -519,13 +595,13 @@ class KotoneBotUI:
label="金币商店购买物品",
info=PurchaseConfig.model_fields['money_items'].description
)
ap_enabled = gr.Checkbox(
label="启用AP购买",
value=self.current_config.options.purchase.ap_enabled,
info=PurchaseConfig.model_fields['ap_enabled'].description
)
# 转换枚举值为显示文本
selected_items: List[str] = []
ap_items_map = {
@ -538,7 +614,7 @@ class KotoneBotUI:
item_enum = APShopItems(item_value)
if item_enum in ap_items_map:
selected_items.append(ap_items_map[item_enum])
ap_items = gr.Dropdown(
multiselect=True,
choices=list(ap_items_map.values()),
@ -546,7 +622,7 @@ class KotoneBotUI:
label="AP商店购买物品",
info=PurchaseConfig.model_fields['ap_items'].description
)
purchase_enabled.change(
fn=lambda x: gr.Group(visible=x),
inputs=[purchase_enabled],
@ -590,14 +666,14 @@ class KotoneBotUI:
interactive=True,
info=AssignmentConfig.model_fields['online_live_duration'].description
)
assignment_enabled.change(
fn=lambda x: gr.Group(visible=x),
inputs=[assignment_enabled],
outputs=[work_group]
)
return assignment_enabled, mini_live_reassign, mini_live_duration, online_live_reassign, online_live_duration
def _create_contest_settings(self) -> Tuple[gr.Checkbox, gr.Dropdown]:
with gr.Column():
gr.Markdown("### 竞赛设置")
@ -631,7 +707,7 @@ class KotoneBotUI:
)
with gr.Group(visible=self.current_config.options.produce.enabled) as produce_group:
produce_mode = gr.Dropdown(
choices=["regular", "pro"],
choices=["regular", "pro", "master"],
value=self.current_config.options.produce.mode,
label="培育模式",
info=ProduceConfig.model_fields['mode'].description
@ -679,13 +755,13 @@ class KotoneBotUI:
interactive=True,
info=ProduceConfig.model_fields['memory_sets'].description
)
# 添加偶像选择变化时的回调
def update_kotone_warning(selected_idols, recommend_card_detection_mode):
has_kotone = any("藤田ことね" in idol for idol in selected_idols)
is_strict_mode = recommend_card_detection_mode == RecommendCardDetectionMode.STRICT.value
return gr.Markdown(visible=has_kotone and not is_strict_mode)
auto_set_support = gr.Checkbox(
label="自动编成支援卡",
value=self.current_config.options.produce.auto_set_support_card,
@ -759,14 +835,14 @@ class KotoneBotUI:
inputs=[produce_enabled],
outputs=[produce_group]
)
auto_set_memory.change(
fn=lambda x: gr.Group(visible=not x),
inputs=[auto_set_memory],
outputs=[memory_sets_group]
)
return produce_enabled, produce_mode, produce_count, produce_idols, memory_sets, auto_set_memory, auto_set_support, use_pt_boost, use_note_boost, follow_producer, self_study_lesson, prefer_lesson_ap, actions_order, recommend_card_detection_mode, use_ap_drink, skip_commu
def _create_club_reward_settings(self) -> Tuple[gr.Checkbox, gr.Dropdown]:
with gr.Column():
gr.Markdown("### 社团奖励设置")
@ -868,16 +944,52 @@ class KotoneBotUI:
)
return start_game_enabled, start_through_kuyo, game_package_name, kuyo_package_name
def _create_end_game_settings(self) -> Tuple[gr.Checkbox, gr.Checkbox, gr.Checkbox, gr.Checkbox, gr.Checkbox, gr.Checkbox]:
with gr.Column():
gr.Markdown("### 关闭游戏设置")
gr.Markdown("在所有任务执行完毕后执行下面这些操作:\n(执行单个任务时不会触发)")
exit_kaa = gr.Checkbox(
label="退出 kaa",
value=self.current_config.options.end_game.exit_kaa,
info=EndGameConfig.model_fields['exit_kaa'].description
)
kill_game = gr.Checkbox(
label="关闭游戏",
value=self.current_config.options.end_game.kill_game,
info=EndGameConfig.model_fields['kill_game'].description
)
kill_dmm = gr.Checkbox(
label="关闭 DMM",
value=self.current_config.options.end_game.kill_dmm,
info=EndGameConfig.model_fields['kill_dmm'].description
)
kill_emulator = gr.Checkbox(
label="关闭模拟器",
value=self.current_config.options.end_game.kill_emulator,
info=EndGameConfig.model_fields['kill_emulator'].description
)
shutdown = gr.Checkbox(
label="关闭系统",
value=self.current_config.options.end_game.shutdown,
info=EndGameConfig.model_fields['shutdown'].description
)
hibernate = gr.Checkbox(
label="休眠系统",
value=self.current_config.options.end_game.hibernate,
info=EndGameConfig.model_fields['hibernate'].description
)
return exit_kaa, kill_game, kill_dmm, kill_emulator, shutdown, hibernate
def _create_settings_tab(self) -> None:
with gr.Tab("设置"):
gr.Markdown("## 设置")
# 模拟器设置
emulator_settings = self._create_emulator_settings()
# 商店购买设置
purchase_settings = self._create_purchase_settings()
# 活动费设置
with gr.Column():
gr.Markdown("### 活动费设置")
@ -886,7 +998,7 @@ class KotoneBotUI:
value=self.current_config.options.activity_funds.enabled,
info=ActivityFundsConfig.model_fields['enabled'].description
)
# 礼物设置
with gr.Column():
gr.Markdown("### 礼物设置")
@ -895,16 +1007,16 @@ class KotoneBotUI:
value=self.current_config.options.presents.enabled,
info=PresentsConfig.model_fields['enabled'].description
)
# 工作设置
work_settings = self._create_work_settings()
# 竞赛设置
contest_settings = self._create_contest_settings()
# 培育设置
produce_settings = self._create_produce_settings()
# 任务奖励设置
with gr.Column():
gr.Markdown("### 任务奖励设置")
@ -913,7 +1025,7 @@ class KotoneBotUI:
value=self.current_config.options.mission_reward.enabled,
info=MissionRewardConfig.model_fields['enabled'].description
)
# 社团奖励设置
club_reward_settings = self._create_club_reward_settings()
@ -925,9 +1037,9 @@ class KotoneBotUI:
value=self.current_config.options.upgrade_support_card.enabled,
info=UpgradeSupportCardConfig.model_fields['enabled'].description
)
capsule_toys_settings = self._create_capsule_toys_settings()
# 跟踪设置
with gr.Column():
gr.Markdown("### 跟踪设置")
@ -941,10 +1053,13 @@ class KotoneBotUI:
# 启动游戏设置
start_game_settings = self._create_start_game_settings()
# 关闭游戏设置
end_game_settings = self._create_end_game_settings()
save_btn = gr.Button("保存设置")
result = gr.Markdown()
# 收集所有设置组件
all_settings = [
*emulator_settings,
@ -959,9 +1074,10 @@ class KotoneBotUI:
*club_reward_settings,
upgrade_support_card_enabled,
*capsule_toys_settings,
*start_game_settings
*start_game_settings,
*end_game_settings
]
save_btn.click(
fn=self.save_settings,
inputs=all_settings,
@ -971,7 +1087,7 @@ class KotoneBotUI:
def _create_log_tab(self) -> None:
with gr.Tab("日志"):
gr.Markdown("## 日志")
with gr.Column():
with gr.Row():
export_dumps_btn = gr.Button("导出 dump")
@ -979,7 +1095,7 @@ class KotoneBotUI:
with gr.Row():
save_report_btn = gr.Button("一键导出报告")
result_text = gr.Markdown("等待操作\n\n\n")
export_dumps_btn.click(
fn=self.export_dumps,
outputs=[result_text]
@ -1042,7 +1158,7 @@ class KotoneBotUI:
with gr.Blocks(title="琴音小助手", css="#container { max-width: 800px; margin: auto; padding: 20px; }") as app:
with gr.Column(elem_id="container"):
gr.Markdown(f"# 琴音小助手 v{self._kaa.version}")
with gr.Tabs():
self._create_status_tab()
self._create_task_tab()
@ -1050,7 +1166,7 @@ class KotoneBotUI:
self._create_log_tab()
self._create_whats_new_tab()
self._create_screen_tab()
return app
def main(kaa: Kaa | None = None) -> None:

View File

@ -18,7 +18,8 @@ from ..produce.non_lesson_actions import (
enter_allowance, allowance_available,
study_available, enter_study,
is_rest_available, rest,
outing_available, enter_outing
outing_available, enter_outing,
consult_available, enter_consult
)
logger = logging.getLogger(__name__)
@ -67,6 +68,7 @@ def handle_recommended_action(final_week: bool = False) -> ProduceAction | None:
R.InPurodyuusu.TextSenseiTipVocal,
R.InPurodyuusu.TextSenseiTipVisual,
R.InPurodyuusu.TextSenseiTipRest,
R.InPurodyuusu.TextSenseiTipConsult,
]):
break
it.wait()
@ -94,6 +96,9 @@ def handle_recommended_action(final_week: bool = False) -> ProduceAction | None:
elif result.index == 3:
rest()
return ProduceAction.REST
elif result.index == 4:
enter_consult()
return ProduceAction.CONSULT
else:
return None
# 点击课程
@ -375,7 +380,8 @@ def produce_end():
device.click(image.expect_wait(R.InPurodyuusu.ButtonComplete))
wait(0.5, before='screenshot')
break
# 培育得硬币活动时,弹出的硬币获得对话框
# 1. P任务解锁提示
# 2. 培育得硬币活动时,弹出的硬币获得对话框
elif dialog.no():
pass
@ -468,6 +474,10 @@ def handle_action(action: ProduceAction, final_week: bool = False) -> ProduceAct
if allowance_available():
enter_allowance()
return ProduceAction.ALLOWANCE
case ProduceAction.CONSULT:
if consult_available():
enter_consult()
return ProduceAction.CONSULT
case _:
logger.warning("Unknown action: %s", action)
return None
@ -492,7 +502,7 @@ def week_normal(week_first: bool = False):
match action:
case (
ProduceAction.REST |
ProduceAction.OUTING | ProduceAction.STUDY | ProduceAction.ALLOWANCE
ProduceAction.OUTING | ProduceAction.STUDY | ProduceAction.ALLOWANCE | ProduceAction.CONSULT
):
# 什么都不需要做
pass
@ -520,7 +530,8 @@ def week_final_lesson():
match action:
case (
ProduceAction.REST |
ProduceAction.OUTING | ProduceAction.STUDY | ProduceAction.ALLOWANCE
ProduceAction.OUTING | ProduceAction.STUDY | ProduceAction.ALLOWANCE |
ProduceAction.CONSULT
):
# 什么都不需要做
pass
@ -625,6 +636,42 @@ def hajime_pro(week: int = -1, start_from: int = 1):
logger.info("Week %d started.", i + start_from)
w()
@action("执行 MASTER 培育")
def hajime_master(week: int = -1, start_from: int = 1):
"""
MASTER 模式
:param week: 第几周从1开始-1表示全部
:param start_from: 从第几周开始从1开始
"""
weeks = [
lambda: week_normal(True), # 1
week_normal, # 2
week_normal, # 3
week_normal, # 4
week_normal, # 5
week_normal, # 6
week_final_lesson, # 7
week_mid_exam, # 8
week_normal, # 9
week_normal, # 10
week_normal, # 11
week_normal, # 12
week_normal, # 13
week_normal, # 14
week_normal, # 15
week_normal, # 16
week_final_lesson, # 17
week_final_exam, # 18
]
if week != -1:
logger.info("Week %d started.", week)
weeks[week - 1]()
else:
for i, w in enumerate(weeks[start_from-1:]):
logger.info("Week %d started.", i + start_from)
w()
@action('是否在考试场景')
def is_exam_scene():
"""是否在考试场景"""
@ -675,7 +722,7 @@ def detect_produce_scene(ctx: DispatcherContext) -> ProduceStage:
return 'unknown'
@action('开始 Hajime 培育')
def hajime_from_stage(stage: ProduceStage, type: Literal['regular', 'pro'], week: int):
def hajime_from_stage(stage: ProduceStage, type: Literal['regular', 'pro', 'master'], week: int):
"""
开始 Regular 培育
"""
@ -684,11 +731,23 @@ def hajime_from_stage(stage: ProduceStage, type: Literal['regular', 'pro'], week
# 提取周数
remaining_week = texts.squash().replace('ó', '6').numbers()
if not remaining_week:
raise UnrecoverableError("Failed to detect week.")
raise UnrecoverableError("Failed to detect week. text=" + repr(texts.squash()))
# 判断阶段
MID_WEEK = 6 if type == 'regular' else 7
FINAL_WEEK = 13 if type == 'regular' else 16
function = hajime_regular if type == 'regular' else hajime_pro
match type:
case 'regular':
MID_WEEK = 6
FINAL_WEEK = 13
function = hajime_regular
case 'pro':
MID_WEEK = 7
FINAL_WEEK = 16
function = hajime_pro
case 'master':
MID_WEEK = 8
FINAL_WEEK = 18
function = hajime_master
case _:
assert_never(type)
if texts.where(contains('中間')):
week = MID_WEEK - remaining_week[0]
function(start_from=week)
@ -700,27 +759,37 @@ def hajime_from_stage(stage: ProduceStage, type: Literal['regular', 'pro'], week
elif stage == 'exam-ongoing':
# TODO: 应该直接调用 week_final_exam 而不是再写一次
logger.info("Exam ongoing. Start exam.")
if type == 'regular':
if week > 6: # 第六周为期中考试
exam('final')
return produce_end()
else:
exam('mid')
return hajime_from_stage(detect_produce_scene(), type, week)
elif type == 'pro':
if week > 7:
exam('final')
return produce_end()
else:
exam('mid')
return hajime_from_stage(detect_produce_scene(), type, week)
match type:
case 'regular':
if week > 6: # 第六周为期中考试
exam('final')
return produce_end()
else:
exam('mid')
return hajime_from_stage(detect_produce_scene(), type, week)
case 'pro':
if week > 7:
exam('final')
return produce_end()
else:
exam('mid')
return hajime_from_stage(detect_produce_scene(), type, week)
case 'master':
if week > 8:
exam('final')
return produce_end()
else:
exam('mid')
return hajime_from_stage(detect_produce_scene(), type, week)
case _:
assert_never(type)
elif stage == 'practice-ongoing':
# TODO: 应该直接调用 week_final_exam 而不是再写一次
logger.info("Practice ongoing. Start practice.")
practice()
return hajime_from_stage(detect_produce_scene(), type, week)
else:
raise UnrecoverableError(f'Cannot resume produce REGULAR from stage "{stage}".')
raise UnrecoverableError(f'Cannot resume produce from stage "{stage}".')
@action('继续 Regular 培育')
def resume_regular_produce(week: int):
@ -740,6 +809,15 @@ def resume_pro_produce(week: int):
"""
hajime_from_stage(detect_produce_scene(), 'pro', week)
@action('继续 MASTER 培育')
def resume_master_produce(week: int):
"""
继续 MASTER 培育
:param week: 当前周数
"""
hajime_from_stage(detect_produce_scene(), 'master', week)
if __name__ == '__main__':
from logging import getLogger
@ -760,44 +838,4 @@ if __name__ == '__main__':
init_context(config_type=BaseConfig)
manual_context().begin()
debug.auto_save_to_folder = 'dumps'
debug.enabled = True
# hajime_regular(start_from=1)
# pf = Profiler('profiler')
# pf.begin()
# # do_produce(conf().produce.idols[0], 'pro')
# practice()
# hajime_pro(start_from=16)
# pf.end()
# pf.snakeviz()
# while True:
# cards = obtain_cards()
# print(cards)
# sleep(1)
# practice()
# week_mid_exam()
# week_final_exam()
# exam('final')
# produce_end()
# hajime_pro(start_from=16)
# exam('mid')
stage = (detect_produce_scene())
hajime_from_stage(stage, 'pro', 0)
# click_recommended_card(card_count=skill_card_count())
# exam('mid')
# hajime_regular(start_from=7)
# import cv2
# while True:
# img = device.screenshot()
# cv2.imshow('123', img)
# cv2.waitKey(1)
debug.enabled = True

View File

@ -5,12 +5,14 @@
"""
from logging import getLogger
from kotonebot.tasks.game_ui import dialog
from .. import R
from ..common import conf
from ..produce.common import fast_acquisitions
from ..game_ui.commu_event_buttons import CommuEventButtonUI
from kotonebot.util import Interval
from kotonebot.util import Countdown, Interval
from kotonebot.errors import UnrecoverableError
from kotonebot import device, image, action, sleep
from kotonebot.backend.dispatch import SimpleDispatcher
@ -32,6 +34,14 @@ def study_available():
# [screenshots/produce/action_study1.png]
return image.find(R.InPurodyuusu.ButtonIconStudy) is not None
@action('检测是否可以执行相談')
def consult_available():
"""
判断是否可以执行相談
"""
return image.find(R.InPurodyuusu.ButtonIconConsult) is not None
# TODO: 把进入授業的逻辑和执行授業的逻辑分离
@action('执行授業')
def enter_study():
"""
@ -131,6 +141,87 @@ def enter_allowance():
it.wait()
logger.info("活動支給 completed.")
# TODO: 将逻辑用循环改写
@action('执行相談', screenshot_mode='manual-inherit')
def enter_consult():
"""
执行相談
前置条件位于行动页面且所有行动按钮清晰可见 \n
结束状态位于行动页面
"""
logger.info("Executing 相談.")
logger.info("Double clicking on 相談.")
device.screenshot()
device.double_click(image.expect(R.InPurodyuusu.ButtonIconConsult), interval=1)
# 等待进入页面
while not image.find(R.InPurodyuusu.IconTitleConsult):
device.screenshot()
logger.debug("Waiting for 相談 screen.")
fast_acquisitions()
# # 尝试固定购买第一个物品
# device.click(R.InPurodyuusu.PointConsultFirstItem)
# sleep(0.5)
# device.click(image.expect(R.InPurodyuusu.ButtonIconExchange))
# # 等待弹窗
# timeout_cd = Countdown(sec=5).start()
# while not timeout_cd.expired():
# if dialog.yes():
# break
# # 结束
# while not image.find(R.InPurodyuusu.ButtonEndConsult):
# fast_acquisitions()
# device.click(image.expect_wait(R.InPurodyuusu.ButtonEndConsult))
# # 可能会弹出确认对话框
# timeout_cd.reset().start()
# while not timeout_cd.expired():
# dialog.yes()
device.click(R.InPurodyuusu.PointConsultFirstItem)
sleep(0.3)
it = Interval()
wait_purchase_cd = Countdown(sec=5)
exit_cd = Countdown(sec=5)
purchase_clicked = False
purchase_confirmed = False
exit_clicked = False
while True:
device.screenshot()
it.wait()
if wait_purchase_cd.expired():
# 等待购买确认对话框超时后直接认为购买完成
purchase_confirmed = True
if dialog.yes():
if purchase_clicked:
purchase_confirmed = True
continue
elif purchase_confirmed:
continue
elif exit_clicked:
break
if image.find(R.InPurodyuusu.ButtonIconExchange, colored=True):
device.click()
purchase_clicked = True
continue
if purchase_confirmed and image.find(R.InPurodyuusu.ButtonEndConsult):
device.click()
exit_clicked = True
exit_cd.start()
continue
# 等待退出对话框超时,直接退出
if exit_cd.expired():
break
if not purchase_confirmed:
device.click(R.InPurodyuusu.PointConsultFirstItem)
# 处理不能购买的情况(超时)
# TODO: 应当检测画面文字/图标而不是用超时
wait_purchase_cd.start()
logger.info("相談 completed.")
@action('判断是否可以休息')
def is_rest_available():
"""
@ -213,22 +304,4 @@ if __name__ == '__main__':
from kotonebot.backend.context import manual_context, init_context
init_context()
manual_context().begin()
# 获取三个选项的内容
ui = CommuEventButtonUI()
buttons = ui.all()
if not buttons:
raise UnrecoverableError("Failed to find any buttons.")
# 选中 +30 的选项
target_btn = next((btn for btn in buttons if btn.description == '+30'), None)
if target_btn is None:
logger.error("Failed to find +30 option. Pick the first button instead.")
target_btn = buttons[0]
# 固定点击 Vi. 选项
logger.debug('Clicking "%s".', target_btn.description)
if target_btn.selected:
device.click(target_btn)
else:
device.double_click(target_btn)
while fast_acquisitions() is None:
logger.info("Waiting for acquisitions finished.")
logger.info("授業 completed.")
enter_consult()

View File

@ -1,11 +1,11 @@
import logging
from itertools import cycle
from typing import Optional, Literal
from typing_extensions import assert_never
from kotonebot.backend.context.context import wait
from kotonebot.tasks.game_ui.scrollable import Scrollable
from kotonebot.ui import user
from kotonebot.util import Countdown, Interval
from kotonebot.backend.context.context import wait
from kotonebot.backend.dispatch import SimpleDispatcher
from .. import R
@ -13,7 +13,8 @@ from ..common import conf
from ..game_ui import dialog
from ..actions.scenes import at_home, goto_home
from ..game_ui.idols_overview import locate_idol
from ..produce.in_purodyuusu import hajime_pro, hajime_regular, resume_pro_produce, resume_regular_produce
from ..produce.in_purodyuusu import hajime_pro, hajime_regular, hajime_master, resume_pro_produce, resume_regular_produce, \
resume_master_produce
from kotonebot import device, image, ocr, task, action, sleep, contains
logger = logging.getLogger(__name__)
@ -135,13 +136,16 @@ def resume_produce():
mode_result = image.find_multi([
R.Produce.ResumeDialogRegular,
R.Produce.ResumeDialogPro,
R.Produce.ResumeDialogMaster
])
if not mode_result:
raise ValueError('Failed to detect produce mode.')
if mode_result.index == 0:
mode = 'regular'
else:
elif mode_result.index == 1:
mode = 'pro'
else:
mode = 'master'
logger.info(f'Produce mode: {mode}')
retry_count = 0
max_retries = 5
@ -167,16 +171,20 @@ def resume_produce():
# [kotonebot-resource/sprites/jp/produce/produce_resume.png]
logger.info('Click resume button.')
device.click(btn_resume)
# 继续流程
if mode == 'regular':
resume_regular_produce(current_week)
else:
resume_pro_produce(current_week)
match mode:
case 'regular':
resume_regular_produce(current_week)
case 'pro':
resume_pro_produce(current_week)
case 'master':
resume_master_produce(current_week)
case _:
assert_never(mode)
@action('执行培育', screenshot_mode='manual-inherit')
def do_produce(
idol_skin_id: str,
mode: Literal['regular', 'pro'],
mode: Literal['regular', 'pro', 'master'],
memory_set_index: Optional[int] = None
) -> bool:
"""
@ -200,7 +208,8 @@ def do_produce(
return True
# 0. 进入培育页面
mode_text = 'REGULAR' if mode == 'regular' else 'PRO'
mode_text = mode.upper()
logger.info(f'Enter produce page. Mode: {mode_text}')
result = (SimpleDispatcher('enter_produce')
.click(R.Produce.ButtonProduce)
.click(contains(mode_text))
@ -292,10 +301,15 @@ def do_produce(
device.click()
if image.find(R.Common.ButtonConfirmNoIcon):
device.click()
if mode == 'regular':
hajime_regular()
else:
hajime_pro()
match mode:
case 'regular':
hajime_regular()
case 'pro':
hajime_pro()
case 'master':
hajime_master()
case _:
assert_never(mode)
return True
@task('培育')

View File

@ -1,5 +1,6 @@
"""启动游戏,领取登录奖励,直到首页为止"""
import os
import ctypes
import logging
from . import R
@ -9,7 +10,7 @@ from kotonebot.util import Countdown, Interval
from .actions.scenes import at_home, goto_home
from .actions.commu import handle_unread_commu
from kotonebot.errors import GameUpdateNeededError
from kotonebot import task, action, sleep, device, image, ocr
from kotonebot import task, action, sleep, device, image, ocr, config
logger = logging.getLogger(__name__)
@ -96,7 +97,9 @@ def windows_launch():
结束状态游戏窗口出现
"""
# 检查管理员权限
import ctypes, os
# TODO: 检查截图类型不应该依赖配置文件,而是直接检查 device 实例
if config.current.backend.screenshot_impl == 'remote_windows':
raise NotImplementedError("Task `start_game` is not supported on remote_windows.")
try:
is_admin = os.getuid() == 0 # type: ignore
except AttributeError:

View File

@ -217,6 +217,7 @@ class Countdown:
def reset(self):
self.start_time = time.time()
return self
class Stopwatch:
def __init__(self):

View File

@ -296,7 +296,7 @@ def make_img(ide: Literal['vscode', 'pycharm'], path: str, title: str, height: s
elif ide == 'pycharm':
return f'.. image:: http://localhost:6532/image?path={unify_path(path)}\n'
else:
raise ValueError(f'Unknown IDE: {ide}')
return f'<img src="file:///{escape(path)}" title="{title}" height="{height}" />\n'
def make_classes(resources: list[Resource], ide: Literal['vscode', 'pycharm']) -> list[OutputClass]:
"""根据 Sprite 数据生成 R.py 中的类信息。"""