Merge branch 'feat/launcher'

This commit is contained in:
XcantloadX 2025-07-03 09:09:58 +08:00
commit b325a20b60
25 changed files with 2505 additions and 146 deletions

View File

@ -20,5 +20,5 @@
"venv",
"**/node_modules"
],
"python.analysis.diagnosticMode": "workspace"
// "python.analysis.diagnosticMode": "workspace"
}

16
bootstrap/README.md Normal file
View File

@ -0,0 +1,16 @@
# bootstrap
此文件夹下存放的是为了简化分发而编写的启动器代码。
## bootstrap/kaa-bootstrap
启动器本体,由 Python 编写。负责:
1. 寻找最快的 PyPI 镜像源
2. 安装与更新 pip 和 kaa 本体
3. 读入配置文件,检查是否需要管理员权限
4. 启动 kaa
打包产物bootstrap.pyz
## bootstrap/kaa-wrapper
启动器包装器,由 C++ 编写,用于调用 Python 启动 kaa-bootstrap。
打包产物kaa.exe

View File

@ -0,0 +1,7 @@
from terminal import print_status
from launcher import main_launch
try:
main_launch()
except KeyboardInterrupt:
print_status("运行结束", status='info')

View File

@ -0,0 +1,754 @@
import os
import sys
import json
import ctypes
import codecs
import locale
import logging
import subprocess
import importlib.metadata
import argparse
import tempfile
import zipfile
import shutil
from pathlib import Path
from collections import deque
from datetime import datetime
from time import sleep
from typing import Optional, Dict, Any, TypedDict, Literal, List
from request import head, HTTPError, NetworkError
from terminal import (
Color, print_header, print_status, clear_screen,
get_terminal_width, get_display_width, truncate_string,
hide_cursor, show_cursor, move_cursor_up, wait_key, get_terminal_height
)
from repo import Version
# 配置文件的类型定义
class BackendConfig(TypedDict, total=False):
type: Literal['custom', 'mumu12', 'leidian', 'dmm']
screenshot_impl: Literal['adb', 'adb_raw', 'uiautomator2', 'windows', 'remote_windows', 'nemu_ipc']
class MiscConfig(TypedDict, total=False):
check_update: Literal['never', 'startup']
auto_install_update: bool
class UserConfig(TypedDict, total=False):
name: str
id: str
category: str
description: str
backend: BackendConfig
keep_screenshots: bool
options: Dict[str, Any] # 这里包含 misc 等配置
class Config(TypedDict, total=False):
version: int
user_configs: List[UserConfig]
# 获取当前Python解释器路径
PYTHON_EXECUTABLE = sys.executable
TRUSTED_HOSTS = "pypi.org files.pythonhosted.org pypi.python.org mirrors.aliyun.com mirrors.cloud.tencent.com mirrors.tuna.tsinghua.edu.cn"
def setup_logging():
"""
配置日志记录
"""
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%y-%m-%d-%H-%M-%S")
log_file = log_dir / f"bootstrap-{timestamp}.log"
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d] %(message)s',
filename=log_file,
filemode='w',
encoding='utf-8'
)
# 记录未捕获的异常
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logging.error("未捕获的异常", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
logging.info("日志记录器已初始化。")
PIP_SERVERS = [
"https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple",
"https://mirrors.aliyun.com/pypi/simple",
"https://mirrors.cloud.tencent.com/pypi/simple",
"https://pypi.org/simple",
]
def is_admin() -> bool:
"""
检查当前进程是否具有管理员权限
:return: 如果具有管理员权限返回True否则返回False
:rtype: bool
"""
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
def test_url_availability(url: str) -> bool:
"""
测试URL是否可访问返回200状态码
:param url: 要测试的URL
:type url: str
:return: 如果URL可访问返回True否则返回False
:rtype: bool
"""
try:
with head(url, timeout=10) as response:
return response.status_code == 200
except (HTTPError, NetworkError):
return False
except Exception:
return False
def get_working_pip_server() -> Optional[str]:
"""
获取可用的pip服务器
:return: 第一个可用的pip服务器URL如果都不可用返回None
:rtype: Optional[str]
"""
for server in PIP_SERVERS:
msg = f"正在测试: {server}"
print_status(msg, status='info', indent=1)
logging.info(msg)
if test_url_availability(server):
msg = f"找到可用的pip服务器: {server}"
print_status(msg, status='success', indent=1)
logging.info(msg)
return server
msg = "所有pip服务器都不可用"
print_status(msg, status='error')
logging.error(msg)
return None
def package_version(package_name: str) -> Optional[str]:
"""
获取指定包的版本信息
:param package_name: 包名称
:type package_name: str
:return: 包版本字符串如果包不存在则返回 None
:rtype: Optional[str]
:Example:
.. code-block:: python
>>> package_version("requests")
'2.31.0'
>>> package_version("nonexistent_package")
None
:raises: 无异常抛出包不存在时返回 None
"""
try:
return importlib.metadata.version(package_name)
except importlib.metadata.PackageNotFoundError:
return None
def run_command(command: str, check: bool = True, verbatim: bool = False, scroll_region_size: int = -1, log_output: bool = True) -> bool:
"""
运行命令并实时输出返回是否成功
:param command: 要运行的命令
:param check: 是否检查返回码
:param verbatim: 是否原样输出不使用滚动UI
:param scroll_region_size: 滚动区域的大小, -1 表示动态计算
:param log_output: 是否将命令输出记录到日志中
:return: 命令是否成功执行
"""
logging.info(f"执行命令: {command}")
# 设置环境变量以确保正确的编码处理
env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
# 获取系统默认编码
system_encoding = locale.getpreferredencoding()
# 创建解码器
def decode_output(line: bytes) -> str:
try:
# 首先尝试UTF-8解码
return line.decode('utf-8')
except UnicodeDecodeError:
try:
# 如果UTF-8失败尝试系统默认编码
return line.decode(system_encoding)
except UnicodeDecodeError:
# 如果都失败了,使用'replace'策略
return line.decode('utf-8', errors='replace')
if verbatim:
print(f"▶ 执行命令: {command}")
try:
process = subprocess.Popen(
command, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
env=env
)
if process.stdout:
for line in iter(process.stdout.readline, b''):
clean_line = decode_output(line).strip('\n')
print(clean_line)
if log_output:
logging.info(clean_line)
returncode = process.wait()
logging.info(f"命令执行完毕,返回码: {returncode}")
return returncode == 0
except FileNotFoundError:
msg = f"命令未找到: {command.split()[0]}"
print_status(msg, status='error', indent=1)
logging.error(msg)
return False
except Exception as e:
msg = f"命令执行时发生错误: {e}"
print_status(msg, status='error', indent=1)
logging.error(msg, exc_info=True)
return False
# --- 滚动UI模式 ---
if scroll_region_size == -1:
# Heuristic: leave some lines for context above and below.
# Use at least 5 lines.
SCROLL_REGION_SIZE = max(5, get_terminal_height() - 8)
else:
SCROLL_REGION_SIZE = scroll_region_size
terminal_width = get_terminal_width()
# 打印初始状态行
prefix = ""
prefix_width = 2 # "▶ "
available_width = terminal_width - prefix_width
command_text = f"执行命令: {command}"
truncated_command = truncate_string(command_text, available_width)
padding_len = available_width - get_display_width(truncated_command)
padding = ' ' * max(0, padding_len)
print(f"{Color.GRAY}{prefix}{truncated_command}{padding}{Color.RESET}")
process = subprocess.Popen(
command, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
env=env
)
output_buffer = deque(maxlen=SCROLL_REGION_SIZE)
lines_rendered = 0
hide_cursor()
try:
if process.stdout:
for line in iter(process.stdout.readline, b''):
stripped_line = decode_output(line).strip()
output_buffer.append(stripped_line)
if log_output:
logging.info(stripped_line)
# 移动光标到绘制区域顶部
if lines_rendered > 0:
move_cursor_up(lines_rendered)
lines_rendered = min(len(output_buffer), SCROLL_REGION_SIZE)
# 重新绘制滚动区域
lines_to_render = list(output_buffer)
for i in range(lines_rendered):
line_to_print = lines_to_render[i] if i < len(lines_to_render) else ""
prefix = f"{Color.GRAY}|{Color.RESET} "
prefix_width = 2
available_width = terminal_width - prefix_width
truncated = truncate_string(line_to_print, available_width)
padding_len = available_width - get_display_width(truncated)
padding = ' ' * max(0, padding_len)
# 使用 \r 和 \n 刷新行
print(f"\r{prefix}{truncated}{padding}")
returncode = process.wait()
logging.info(f"命令执行完毕,返回码: {returncode}")
finally:
show_cursor()
# 清理滚动区域
if lines_rendered > 0:
move_cursor_up(lines_rendered)
for _ in range(lines_rendered):
print(' ' * terminal_width)
move_cursor_up(lines_rendered)
# 更新最终状态行
move_cursor_up(1)
if returncode == 0:
final_symbol = f"{Color.GREEN}"
success = True
else:
final_symbol = f"{Color.RED}"
success = False
# 重新计算填充以确保行被完全覆盖
final_prefix = f"{final_symbol} "
final_prefix_width = 2 # "✓ " or "✗ "
available_width = terminal_width - final_prefix_width
final_line_text = f"执行命令: {command}"
truncated_final_line = truncate_string(final_line_text, available_width)
padding_len = available_width - get_display_width(truncated_final_line)
padding = ' ' * max(0, padding_len)
print(f"\r{final_prefix}{truncated_final_line}{Color.RESET}{padding}")
if check and not success:
msg = f"命令执行失败,返回码: {returncode}"
print_status(msg, status='error', indent=1)
logging.error(msg)
return False
return success
def check_ksaa_update_available(pip_server: str, current_version: Version) -> tuple[bool, Version | None, Version | None]:
"""
检查ksaa包是否有新版本可用
:param pip_server: pip服务器URL
:type pip_server: str
:param current_version: 当前版本
:type current_version: Version
:return: (是否有更新, 当前版本, 最新版本)
:rtype: tuple[bool, Optional[Version], Optional[Version]]
"""
try:
# 使用repo.py中的list_versions函数和Version类获取最新版本信息
from repo import list_versions, Version
try:
versions = list_versions("ksaa", server_url=pip_server)
if versions and len(versions) > 0:
latest_version = versions[0].version
# 使用Version类的比较功能
if latest_version > current_version:
return True, current_version, latest_version
except Exception as e:
logging.warning(f"从服务器 {pip_server} 获取版本信息失败: {e}")
print_status(f"从服务器 {pip_server} 获取版本信息失败: {e}", status='error')
# 如果指定服务器失败尝试使用默认PyPI服务器
try:
versions = list_versions("ksaa")
if versions and len(versions) > 0:
latest_version = versions[0].version
# 使用Version类的比较功能
if latest_version > current_version:
return True, current_version, latest_version
except Exception as e2:
logging.warning(f"从PyPI获取版本信息也失败: {e2}")
return False, current_version, latest_version if 'latest_version' in locals() else None
except Exception as e:
logging.warning(f"检查ksaa更新时发生错误: {e}")
return False, None, None
def print_update_notice(current_version: str, latest_version: str):
"""
打印更新提示信息
:param current_version: 当前版本
:type current_version: str
:param latest_version: 最新版本
:type latest_version: str
"""
clear_screen()
print()
print(f"{Color.YELLOW}{Color.BOLD}" + "=" * 60)
print(f"{Color.YELLOW}{Color.BOLD}⚠️ 发现新版本可用!")
print(f"{Color.YELLOW}{Color.BOLD}" + "=" * 60)
print(f"{Color.YELLOW}当前版本: {current_version}")
print(f"{Color.YELLOW}最新版本: {latest_version}")
print(f"{Color.YELLOW}建议开启自动更新或在设置中手动安装新版本。")
print(f"{Color.YELLOW}5s 后继续启动")
print(f"{Color.YELLOW}{Color.BOLD}" + "=" * 60 + f"{Color.RESET}")
print()
sleep(5)
def install_ksaa_version(pip_server: str, trusted_hosts: str, version: str) -> bool:
"""
安装指定版本的ksaa包
:param pip_server: pip服务器URL
:type pip_server: str
:param trusted_hosts: 信任的主机列表
:type trusted_hosts: str
:param version: 要安装的版本号
:type version: str
:return: 安装是否成功
:rtype: bool
"""
print_status(f"安装琴音小助手 v{version}", status='info')
install_command = f'"{PYTHON_EXECUTABLE}" -m pip install --index-url {pip_server} --trusted-host "{trusted_hosts}" --no-warn-script-location ksaa=={version}'
return run_command(install_command)
def install_ksaa_from_zip(zip_path: str) -> bool:
"""
从zip文件安装ksaa包
:param zip_path: zip文件路径
:type zip_path: str
:return: 安装是否成功
:rtype: bool
"""
zip_file = Path(zip_path)
if not zip_file.exists():
msg = f"zip文件不存在: {zip_path}"
print_status(msg, status='error')
logging.error(msg)
return False
if not zip_file.suffix.lower() == '.zip':
msg = f"文件不是zip格式: {zip_path}"
print_status(msg, status='error')
logging.error(msg)
return False
print_status(f"从zip文件安装琴音小助手: {zip_path}", status='info')
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
try:
# 解压zip文件
print_status("解压zip文件...", status='info', indent=1)
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
zip_ref.extractall(temp_path)
# 使用pip install --find-links安装
print_status("安装ksaa包...", status='info', indent=1)
install_command = f'"{PYTHON_EXECUTABLE}" -m pip install --no-warn-script-location --no-cache-dir --upgrade --no-deps --force-reinstall --no-index --find-links "{temp_path.absolute()}" ksaa'
return run_command(install_command)
except zipfile.BadZipFile:
msg = f"无效的zip文件: {zip_path}"
print_status(msg, status='error')
logging.error(msg)
return False
except Exception as e:
msg = f"从zip文件安装失败: {e}"
print_status(msg, status='error')
logging.error(msg, exc_info=True)
return False
def install_pip_and_ksaa(pip_server: str, check_update: bool = True, install_update: bool = True) -> bool:
"""
安装和更新pip以及ksaa包
:param pip_server: pip服务器URL
:type pip_server: str
:param check_update: 是否检查更新
:type check_update: bool
:param install_update: 是否安装更新
:type install_update: bool
:return: 安装是否成功
:rtype: bool
"""
print_header("安装与更新小助手", color=Color.BLUE)
# 升级pip
print_status("更新 pip", status='info')
upgrade_pip_command = f'"{PYTHON_EXECUTABLE}" -m pip install -i {pip_server} --trusted-host "{TRUSTED_HOSTS}" --upgrade pip'
if not run_command(upgrade_pip_command):
return False
# 默认安装逻辑
install_command = f'"{PYTHON_EXECUTABLE}" -m pip install --upgrade --index-url {pip_server} --trusted-host "{TRUSTED_HOSTS}" --no-warn-script-location ksaa'
ksaa_version_str = package_version("ksaa")
# 未安装
if not ksaa_version_str:
print_status("安装琴音小助手", status='info')
return run_command(install_command)
# 已安装,检查更新
else:
ksaa_version = Version(ksaa_version_str)
if check_update:
has_update, current_version, latest_version = check_ksaa_update_available(pip_server, ksaa_version)
if has_update:
if install_update:
print_status("更新琴音小助手", status='info')
return run_command(install_command)
else:
print_update_notice(str(current_version), str(latest_version))
else:
print_status("已是最新版本", status='success')
return True
def load_config() -> Optional[Config]:
"""
加载config.json配置文件
:return: 配置字典如果加载失败返回None
:rtype: Optional[Config]
"""
config_path = Path("./config.json")
if not config_path.exists():
msg = "配置文件 config.json 不存在,跳过配置加载"
print_status(msg, status='warning')
logging.warning(msg)
return None
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
msg = "成功加载配置文件"
print_status(msg, status='success')
logging.info(msg)
return config
except Exception as e:
msg = f"加载配置文件失败: {e}"
print_status(msg, status='error')
logging.error(msg, exc_info=True)
return None
def get_update_settings(config: Config) -> tuple[bool, bool]:
"""
从配置中获取更新设置
:param config: 配置字典
:type config: Config
:return: (是否检查更新, 是否自动安装更新)
:rtype: tuple[bool, bool]
"""
# 默认值
check_update = True
auto_install_update = True
# 检查是否有用户配置
user_configs = config.get("user_configs", [])
if user_configs:
first_config = user_configs[0]
options = first_config.get("options", {})
misc = options.get("misc", {})
# 获取检查更新设置
check_update_setting = misc.get("check_update", "startup")
check_update = check_update_setting == "startup"
# 获取自动安装更新设置
auto_install_update = misc.get("auto_install_update", True)
msg = f"更新设置: 检查更新={check_update}, 自动安装={auto_install_update}"
logging.info(msg)
return check_update, auto_install_update
def restart_as_admin() -> None:
"""
以管理员身份重启程序
"""
if is_admin():
return
script = os.path.abspath(sys.argv[0])
params = ' '.join([f'"{item}"' for item in sys.argv[1:]])
try:
# 使用 ShellExecute 以管理员身份启动程序
ret = ctypes.windll.shell32.ShellExecuteW(
None, "runas", PYTHON_EXECUTABLE, f'"{script}" {params}', None, 1
)
if ret > 32: # 返回值大于32表示成功
msg = "正在以管理员身份重启程序..."
print_status(msg, status='info')
logging.info(msg)
os._exit(0)
else:
msg = f"以管理员身份重启失败,错误码: {ret}"
print_status(msg, status='error')
logging.error(msg)
return
except Exception as e:
msg = f"以管理员身份重启时发生错误: {e}"
print_status(msg, status='error')
logging.error(msg, exc_info=True)
return
def check_admin(config: Config) -> bool:
"""
检查Windows截图权限管理员权限
:param config: 配置字典
:type config: Config
:return: 权限检查是否通过
:rtype: bool
"""
# 检查是否有用户配置
user_configs = config.get("user_configs", [])
if not user_configs:
msg = "配置文件中没有用户配置"
print_status(msg, status='warning')
logging.warning(msg)
return True # Not a fatal error, allow to continue
# 检查第一个用户配置的截图方式
first_config = user_configs[0]
backend = first_config.get("backend", {})
screenshot_impl = backend.get("screenshot_impl")
if screenshot_impl == "windows":
msg = "检测到Windows截图模式检查管理员权限..."
print_status(msg, status='info')
logging.info(msg)
if not is_admin():
msg1 = "需要管理员权限才能使用Windows截图模式"
print_status(msg1, status='error')
logging.error(msg1)
# 尝试以管理员身份重启
msg2 = "正在尝试以管理员身份重启..."
print_status(msg2, status='info', indent=1)
logging.info(msg2)
restart_as_admin()
return False
else:
msg = "管理员权限检查通过"
print_status(msg, status='success')
logging.info(msg)
return True
def run_kaa() -> bool:
"""
运行琴音小助手
:return: 运行是否成功
:rtype: bool
"""
print_header("运行琴音小助手", color=Color.GREEN)
clear_screen()
# 设置环境变量
os.environ["no_proxy"] = "localhost, 127.0.0.1, ::1"
# 运行kaa命令
if not run_command(f'"{PYTHON_EXECUTABLE}" -m kotonebot.kaa.main.cli', verbatim=True, log_output=False):
return False
print_header("运行结束", color=Color.GREEN)
return True
def parse_arguments():
"""
解析命令行参数
:return: 解析后的参数
:rtype: argparse.Namespace
"""
parser = argparse.ArgumentParser(description='琴音小助手启动器')
parser.add_argument('zip_file', nargs='?', help='要安装的 zip 文件路径(与--install-from-zip等价')
parser.add_argument('--install-version', type=str, help='安装指定版本的 ksaa (例如: --install-version=1.2.3)')
parser.add_argument('--install-from-zip', type=str, help='从 zip 文件安装 ksaa (例如: --install-from-zip=/path/to/file.zip)')
return parser.parse_args()
def main_launch():
"""
主启动函数执行完整的安装和启动流程
"""
# 解析命令行参数
args = parse_arguments()
# 处理位置参数如果提供了zip_file位置参数将其设置为install_from_zip
if args.zip_file and not args.install_from_zip:
args.install_from_zip = args.zip_file
setup_logging()
run_command("title 琴音小助手(运行时请勿关闭此窗口)", verbatim=True, log_output=False)
clear_screen()
print_header("琴音小助手启动器")
logging.info("启动器已启动。")
try:
# 1. 加载配置文件(提前加载以获取更新设置)
print_header("加载配置", color=Color.BLUE)
logging.info("加载配置。")
config = load_config()
# 2. 获取更新设置
check_update, auto_install_update = get_update_settings(config if config else {"version": 5, "user_configs": []})
# 3. 如果指定了特殊安装参数,跳过更新检查
if args.install_version or args.install_from_zip:
check_update = False
auto_install_update = False
# 4. 根据配置决定是否检查更新
print_status("正在寻找最快的 PyPI 镜像源...", status='info')
logging.info("正在寻找最快的 PyPI 镜像源...")
pip_server = get_working_pip_server()
if not pip_server:
raise RuntimeError("没有找到可用的pip服务器请检查网络连接。")
# 5. 处理特殊安装情况
if args.install_from_zip:
# 从zip文件安装
print_header("安装补丁", color=Color.BLUE)
if not install_ksaa_from_zip(args.install_from_zip):
raise RuntimeError("从zip文件安装失败请检查上面的错误日志。")
elif args.install_version:
# 安装指定版本
print_header("安装指定版本", color=Color.BLUE)
if not install_ksaa_version(pip_server, TRUSTED_HOSTS, args.install_version):
raise RuntimeError("安装指定版本失败,请检查上面的错误日志。")
else:
# 默认安装和更新逻辑
if not install_pip_and_ksaa(pip_server, check_update, auto_install_update):
raise RuntimeError("依赖安装失败,请检查上面的错误日志。")
# 6. 检查Windows截图权限
if config:
if not check_admin(config):
raise RuntimeError("权限检查失败。")
# 7. 运行琴音小助手
if not run_kaa():
raise RuntimeError("琴音小助手主程序运行失败。")
msg = "琴音小助手已退出。"
print_status(msg, status='success')
logging.info(msg)
except Exception as e:
msg = f"发生致命错误: {e}"
print_status(msg, status='error')
print_status("压缩 kaa 目录下的 logs 文件夹并给此窗口截图后一并发送给开发者", status='error')
logging.critical(msg, exc_info=True)
finally:
logging.info("启动器运行结束。")
wait_key("\n按任意键退出...")
if __name__ == "__main__":
try:
main_launch()
except KeyboardInterrupt:
print_status("运行结束。现在可以安全关闭此窗口。", status='info')

View File

@ -0,0 +1,235 @@
import re
import html.parser
import urllib.parse
from typing import List
from dataclasses import dataclass
from request import get, HTTPError, NetworkError
@dataclass
class Version:
"""版本信息"""
version_str: str
major: int = 0
minor: int = 0
patch: int = 0
prerelease: str = ""
prerelease_num: int = 0
def __post_init__(self):
"""初始化后解析版本号"""
self._parse_version()
def _parse_version(self):
"""解析版本号字符串"""
version_str = self.version_str.lower()
# 基本版本号匹配 (如 1.2.3, 1.2, 1)
version_match = re.match(r'^(\d+)(?:\.(\d+))?(?:\.(\d+))?', version_str)
if version_match:
self.major = int(version_match.group(1))
self.minor = int(version_match.group(2)) if version_match.group(2) else 0
self.patch = int(version_match.group(3)) if version_match.group(3) else 0
# 预发布版本匹配 (如 alpha1, beta2, rc3)
prerelease_match = re.search(r'(alpha|beta|rc|dev|pre|post)(\d*)', version_str)
if prerelease_match:
self.prerelease = prerelease_match.group(1)
self.prerelease_num = int(prerelease_match.group(2)) if prerelease_match.group(2) else 0
def __lt__(self, other):
"""版本比较"""
if not isinstance(other, Version):
return NotImplemented
# 比较主版本号
if self.major != other.major:
return self.major < other.major
if self.minor != other.minor:
return self.minor < other.minor
if self.patch != other.patch:
return self.patch < other.patch
# 比较预发布版本
prerelease_order = {'': 4, 'rc': 3, 'beta': 2, 'alpha': 1, 'dev': 0, 'pre': 0, 'post': 5}
self_order = prerelease_order.get(self.prerelease, 0)
other_order = prerelease_order.get(other.prerelease, 0)
if self_order != other_order:
return self_order < other_order
# 同类型预发布版本比较数字
if self.prerelease == other.prerelease:
return self.prerelease_num < other.prerelease_num
return False
def __eq__(self, other):
"""版本相等比较"""
if not isinstance(other, Version):
return NotImplemented
return (self.major == other.major and
self.minor == other.minor and
self.patch == other.patch and
self.prerelease == other.prerelease and
self.prerelease_num == other.prerelease_num)
def __repr__(self):
return f"Version('{self.version_str}')"
def __str__(self):
return self.version_str
@dataclass
class PackageVersion:
"""包版本信息"""
version: Version
url: str
class PyPIHTMLParser(html.parser.HTMLParser):
"""解析PyPI HTML响应的解析器"""
def __init__(self):
super().__init__()
self.links = []
self.current_href = None
self.current_text = None
def handle_starttag(self, tag, attrs):
if tag == 'a':
# 提取href属性
for attr_name, attr_value in attrs:
if attr_name == 'href':
self.current_href = attr_value
break
def handle_data(self, data):
if self.current_href:
self.current_text = data.strip()
def handle_endtag(self, tag):
if tag == 'a' and self.current_href and self.current_text:
self.links.append((self.current_text, self.current_href))
self.current_href = None
self.current_text = None
def normalize_package_name(package_name: str) -> str:
"""
标准化包名 _, -, . 字符视为相等
"""
return re.sub(r'[_.-]', '-', package_name.lower())
def extract_version_from_filename(filename: str) -> str:
"""
从文件名中提取版本号
例如: beautifulsoup4-4.13.0b2-py3-none-any.whl -> 4.13.0b2
"""
# 匹配版本号模式
version_pattern = r'^[^-]+-([^-]+?)(?:-py\d+)?(?:-none-any)?\.(?:whl|tar\.gz|zip)$'
match = re.match(version_pattern, filename)
if match:
return match.group(1)
# 备用模式:查找版本号
version_match = re.search(r'-(\d+\.\d+(?:\.\d+)?(?:[a-zA-Z0-9]*))', filename)
if version_match:
return version_match.group(1)
return "unknown"
def list_versions(package_name: str, *, server_url: str | None = None) -> List[PackageVersion]:
"""
获取指定包的所有可用版本按版本号降序排列
:param package_name: 包名
:type package_name: str
:param server_url: 可选的服务器URL默认为None时使用PyPI官方服务器https://pypi.org/simple
:type server_url: str | None
:return: 包含版本信息的列表按版本号降序排列
:rtype: List[PackageVersion]
:raises HTTPError: 当包不存在或网络错误时
:raises NetworkError: 当网络连接错误时
"""
# 标准化包名
normalized_name = normalize_package_name(package_name)
# 构建API URL
if server_url is None:
base_url = "https://pypi.org/simple"
else:
base_url = server_url.rstrip('/')
url = f"{base_url}/{urllib.parse.quote(normalized_name)}/"
# 设置请求头
headers = {
'Accept': 'application/vnd.pypi.simple.v1+html'
}
try:
# 发送请求
html_content = get(url, headers=headers).decode('utf-8')
# 解析HTML
parser = PyPIHTMLParser()
parser.feed(html_content)
# 处理链接并提取版本信息
versions = []
for filename, href in parser.links:
# 提取版本号
version_str = extract_version_from_filename(filename)
# 创建Version对象
version = Version(version_str)
# 创建PackageVersion对象
package_version = PackageVersion(
version=version,
url=href
)
versions.append(package_version)
# 按版本号降序排列
versions.sort(key=lambda x: x.version, reverse=True)
return versions
except HTTPError as e:
if e.code == 404:
raise ValueError(f"'{package_name}' 不存在") from e
else:
raise
def main():
"""测试函数"""
try:
# 测试获取beautifulsoup4的版本
print("获取 beautifulsoup4 的版本信息...")
versions = list_versions("beautifulsoup4")
print(f"找到 {len(versions)} 个版本:")
for i, pkg_version in enumerate(versions[:10], 1): # 只显示前10个
print(f"{i}. 版本: {pkg_version.version.version_str}")
print(f" 主版本: {pkg_version.version.major}.{pkg_version.version.minor}.{pkg_version.version.patch}")
if pkg_version.version.prerelease:
print(f" 预发布: {pkg_version.version.prerelease}{pkg_version.version.prerelease_num}")
print(f" URL: {pkg_version.url}")
print()
if len(versions) > 10:
print(f"... 还有 {len(versions) - 10} 个版本")
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,248 @@
import os
import urllib.parse
import urllib.error
import urllib.request
from typing import Dict, Any, Optional, Callable, TYPE_CHECKING
if TYPE_CHECKING:
from http.client import HTTPResponse
class HTTPError(Exception):
"""HTTP请求错误"""
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(f"HTTP {code}: {message}")
class NetworkError(Exception):
"""网络连接错误"""
pass
class Response:
"""HTTP响应封装"""
def __init__(self, http_response: "HTTPResponse"):
self._response = http_response
self._content: Optional[bytes] = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def close(self):
"""关闭响应和底层连接。"""
self._response.close()
@property
def status_code(self) -> int:
"""响应状态码"""
return self._response.getcode()
@property
def reason(self) -> str:
"""响应原因短语"""
return self._response.reason
@property
def headers(self) -> Dict[str, Any]:
"""响应头"""
return dict(self._response.headers)
def read(self) -> bytes:
"""读取响应内容 (bytes)"""
if self._content is None:
self._content = self._response.read()
return self._content
def json(self) -> Any:
"""将响应内容解析为JSON"""
import json
return json.loads(self.read())
def request(
url: str,
method: str = "GET",
headers: Optional[Dict[str, str]] = None,
data: Optional[bytes] = None,
timeout: Optional[float] = None
) -> Response:
"""
发送HTTP请求
:param url: 请求URL
:type url: str
:param method: HTTP方法默认为GET
:type method: str
:param headers: 请求头
:type headers: Optional[Dict[str, str]]
:param data: 请求数据
:type data: Optional[bytes]
:param timeout: 超时时间
:type timeout: Optional[float]
:return: 响应对象
:rtype: Response
:raises HTTPError: HTTP错误
:raises NetworkError: 网络连接错误
"""
# 设置默认请求头
default_headers = {
'User-Agent': 'Python-urllib/3.10'
}
if headers:
default_headers.update(headers)
# 创建请求
req = urllib.request.Request(url, data=data, headers=default_headers, method=method)
try:
# 发送请求
response = urllib.request.urlopen(req, timeout=timeout)
return Response(response)
except urllib.error.HTTPError as e:
raise HTTPError(e.code, e.reason) from e
except urllib.error.URLError as e:
raise NetworkError(f"网络连接错误: {e.reason}") from e
def get(url: str, headers: Optional[Dict[str, str]] = None, timeout: Optional[float] = None) -> bytes:
"""
发送GET请求
:param url: 请求URL
:type url: str
:param headers: 请求头
:type headers: Optional[Dict[str, str]]
:param timeout: 超时时间
:type timeout: Optional[float]
:return: 响应内容
:rtype: bytes
"""
with request(url, method="GET", headers=headers, timeout=timeout) as response:
return response.read()
def head(url: str, headers: Optional[Dict[str, str]] = None, timeout: Optional[float] = None) -> Response:
"""
发送HEAD请求
:param url: 请求URL
:type url: str
:param headers: 请求头
:type headers: Optional[Dict[str, str]]
:param timeout: 超时时间
:type timeout: Optional[float]
:return: 响应对象
:rtype: Response
:raises HTTPError: HTTP错误
:raises NetworkError: 网络连接错误
"""
return request(url, method="HEAD", headers=headers, timeout=timeout)
def download_file(
url: str,
dst_path: str,
*,
callback: Optional[Callable[[int, int], None]] = None,
timeout: Optional[float] = None
) -> None:
"""
下载文件
:param url: 文件URL
:type url: str
:param dst_path: 目标路径
:type dst_path: str
:param callback: 进度回调函数参数为(已下载字节数, 总字节数)
:type callback: Optional[Callable[[int, int], None]]
:param timeout: 超时时间
:type timeout: Optional[float]
:raises HTTPError: HTTP错误
:raises NetworkError: 网络连接错误
"""
# 设置请求头
headers = {
'User-Agent': 'Python-urllib/3.10'
}
# 创建请求
req = urllib.request.Request(url, headers=headers)
try:
# 发送请求
with urllib.request.urlopen(req, timeout=timeout) as response:
# 获取文件大小
content_length = response.headers.get('Content-Length')
total_size = int(content_length) if content_length else None
# 确保目标目录存在
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
# 下载文件
downloaded_size = 0
with open(dst_path, 'wb') as f:
while True:
chunk = response.read(8192) # 8KB chunks
if not chunk:
break
f.write(chunk)
downloaded_size += len(chunk)
# 调用进度回调
if callback and total_size:
callback(downloaded_size, total_size)
elif callback and not total_size:
callback(downloaded_size, -1) # 未知总大小
except urllib.error.HTTPError as e:
raise HTTPError(e.code, e.reason) from e
except urllib.error.URLError as e:
raise NetworkError(f"网络连接错误: {e.reason}") from e
except OSError as e:
raise NetworkError(f"文件写入错误: {e}") from e
def main():
"""测试函数"""
try:
# 测试GET请求
print("测试GET请求...")
response_bytes = get("https://httpbin.org/get")
print(f"响应长度: {len(response_bytes)} 字节")
# 测试文件下载
print("\n测试文件下载...")
def progress_callback(downloaded: int, total: int):
if total > 0:
percentage = (downloaded / total) * 100
print(f"下载进度: {downloaded}/{total} 字节 ({percentage:.1f}%)")
else:
print(f"已下载: {downloaded} 字节")
# 下载一个小文件进行测试
download_file(
"https://httpbin.org/bytes/1024",
"test_download.bin",
callback=progress_callback
)
print("下载完成!")
# 清理测试文件
if os.path.exists("test_download.bin"):
os.remove("test_download.bin")
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,178 @@
import os
import ctypes
import shutil
import unicodedata
from typing import Optional
def _enable_windows_ansi():
"""
On Windows, attempts to enable ANSI escape sequence processing.
"""
if os.name == 'nt':
try:
kernel32 = ctypes.windll.kernel32
handle = kernel32.GetStdHandle(-11) # STD_OUTPUT_HANDLE
mode = ctypes.c_ulong()
if kernel32.GetConsoleMode(handle, ctypes.byref(mode)) == 0:
return # Failed to get console mode
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0:
mode.value |= ENABLE_VIRTUAL_TERMINAL_PROCESSING
if kernel32.SetConsoleMode(handle, mode) == 0:
# Fallback for older systems
os.system('')
except Exception:
# Fallback for environments where ctypes fails (e.g., some IDE terminals)
os.system('')
_enable_windows_ansi()
class Color:
"""ANSI color codes"""
RESET = '\033[0m'
BOLD = '\033[1m'
BLACK = '\033[30m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
GRAY = '\033[90m'
def get_terminal_width(default=80):
"""Gets the width of the terminal."""
# shutil.get_terminal_size is a high-level and more robust way
return shutil.get_terminal_size((default, 24)).columns
def get_terminal_height(default=24):
"""Gets the height of the terminal."""
return shutil.get_terminal_size((default, 24)).lines
def get_display_width(s: str) -> int:
"""Calculates the display width of a string, accounting for wide characters."""
width = 0
for char in s:
# 'F' (Fullwidth), 'W' (Wide) characters take up 2 columns.
if unicodedata.east_asian_width(char) in ('F', 'W'):
width += 2
else:
width += 1
return width
def truncate_string(s: str, max_width: int) -> str:
"""Truncates a string to a maximum display width, handling wide characters."""
if not s or max_width <= 0:
return ""
width = 0
end_pos = 0
for i, char in enumerate(s):
# 'F' (Fullwidth), 'W' (Wide) characters take up 2 columns.
char_width = 2 if unicodedata.east_asian_width(char) in ('F', 'W') else 1
if width + char_width > max_width:
break
width += char_width
end_pos = i + 1
return s[:end_pos]
def hide_cursor():
"""Hides the terminal cursor."""
print('\033[?25l', end='')
def show_cursor():
"""Shows the terminal cursor."""
print('\033[?25h', end='')
def move_cursor_up(lines: int):
"""Moves the cursor up by a number of lines."""
if lines > 0:
print(f'\033[{lines}A', end='')
def clear_screen():
"""Clears the terminal screen."""
os.system('cls' if os.name == 'nt' else 'clear')
def print_header(text: str, color: str = Color.CYAN):
"""
Prints a centered header with separators that fill the terminal width.
Accounts for CJK character widths.
:param text: The text to display in the header.
:param color: ANSI color code for the header text.
"""
width = get_terminal_width()
padded_text = f" {text} "
text_display_width = get_display_width(padded_text)
# Handle cases where the text is wider than the terminal
if text_display_width >= width:
print(f"\n{Color.BOLD}{color}{text}{Color.RESET}")
return
separator_total_len = width - text_display_width
l_separator_len = separator_total_len // 2
r_separator_len = separator_total_len - l_separator_len
l_separator = "" * l_separator_len
r_separator = "" * r_separator_len
print(f"\n{Color.BOLD}{color}{l_separator}{padded_text}{r_separator}{Color.RESET}")
def print_status(message: str, success: Optional[bool] = None, status: str = 'info', indent: int = 0):
"""
Prints a status message with a symbol and color.
:param message: The status message to print.
:param success: (Deprecated) If True, sets status to 'success'; if False, 'error'.
:param status: 'success', 'error', 'warning', or 'info'.
:param indent: Number of spaces to indent.
"""
prefix = " " * indent
# Backward compatibility
if success is not None:
status = 'success' if success else 'error'
if status == 'success':
symbol = f"{Color.GREEN}"
elif status == 'error':
symbol = f"{Color.RED}"
elif status == 'warning':
symbol = f"{Color.YELLOW}"
else: # 'info'
symbol = f"{Color.BLUE}{Color.RESET}"
print(f"{prefix}[{symbol}] {message}{Color.RESET}")
def wait_key(message: str = ""):
"""
Prints a message and waits for a single key press from the user.
This is a cross-platform function.
:param message: The message to display before waiting.
"""
print(message, end="", flush=True)
if os.name == 'nt':
import msvcrt
msvcrt.getch()
else:
import sys
import tty
import termios
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
# Add a newline after the key is pressed for cleaner output
print()

418
bootstrap/kaa-wrapper/.gitignore vendored Normal file
View File

@ -0,0 +1,418 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
##
## Get latest from https://github.com/github/gitignore/blob/main/VisualStudio.gitignore
# User-specific files
*.rsuser
*.suo
*.user
*.userosscache
*.sln.docstates
*.env
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
# Mono auto generated files
mono_crash.*
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
[Ww][Ii][Nn]32/
[Aa][Rr][Mm]/
[Aa][Rr][Mm]64/
[Aa][Rr][Mm]64[Ee][Cc]/
bld/
[Oo]bj/
[Oo]ut/
[Ll]og/
[Ll]ogs/
# Build results on 'Bin' directories
**/[Bb]in/*
# Uncomment if you have tasks that rely on *.refresh files to move binaries
# (https://github.com/github/gitignore/pull/3736)
#!**/[Bb]in/*.refresh
# Visual Studio 2015/2017 cache/options directory
.vs/
# Uncomment if you have tasks that create the project's static files in wwwroot
#wwwroot/
# Visual Studio 2017 auto generated files
Generated\ Files/
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
*.trx
# NUnit
*.VisualState.xml
TestResult.xml
nunit-*.xml
# Approval Tests result files
*.received.*
# Build Results of an ATL Project
[Dd]ebugPS/
[Rr]eleasePS/
dlldata.c
# Benchmark Results
BenchmarkDotNet.Artifacts/
# .NET Core
project.lock.json
project.fragment.lock.json
artifacts/
# ASP.NET Scaffolding
ScaffoldingReadMe.txt
# StyleCop
StyleCopReport.xml
# Files built by Visual Studio
*_i.c
*_p.c
*_h.h
*.ilk
*.meta
*.obj
*.idb
*.iobj
*.pch
*.pdb
*.ipdb
*.pgc
*.pgd
*.rsp
# but not Directory.Build.rsp, as it configures directory-level build defaults
!Directory.Build.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.tmp_proj
*_wpftmp.csproj
*.log
*.tlog
*.vspscc
*.vssscc
.builds
*.pidb
*.svclog
*.scc
# Chutzpah Test files
_Chutzpah*
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opendb
*.opensdf
*.sdf
*.cachefile
*.VC.db
*.VC.VC.opendb
# Visual Studio profiler
*.psess
*.vsp
*.vspx
*.sap
# Visual Studio Trace Files
*.e2e
# TFS 2012 Local Workspace
$tf/
# Guidance Automation Toolkit
*.gpState
# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user
# TeamCity is a build add-in
_TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
# AxoCover is a Code Coverage Tool
.axoCover/*
!.axoCover/settings.json
# Coverlet is a free, cross platform Code Coverage Tool
coverage*.json
coverage*.xml
coverage*.info
# Visual Studio code coverage results
*.coverage
*.coveragexml
# NCrunch
_NCrunch_*
.NCrunch_*
.*crunch*.local.xml
nCrunchTemp_*
# MightyMoose
*.mm.*
AutoTest.Net/
# Web workbench (sass)
.sass-cache/
# Installshield output folder
[Ee]xpress/
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# Note: Comment the next line if you want to checkin your web deploy settings,
# but database connection strings (with potential passwords) will be unencrypted
*.pubxml
*.publishproj
# Microsoft Azure Web App publish settings. Comment the next line if you want to
# checkin your Azure Web App publish settings, but sensitive information contained
# in these scripts will be unencrypted
PublishScripts/
# NuGet Packages
*.nupkg
# NuGet Symbol Packages
*.snupkg
# The packages folder can be ignored because of Package Restore
**/[Pp]ackages/*
# except build/, which is used as an MSBuild target.
!**/[Pp]ackages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/[Pp]ackages/repositories.config
# NuGet v3's project.json files produces more ignorable files
*.nuget.props
*.nuget.targets
# Microsoft Azure Build Output
csx/
*.build.csdef
# Microsoft Azure Emulator
ecf/
rcf/
# Windows Store app package directories and files
AppPackages/
BundleArtifacts/
Package.StoreAssociation.xml
_pkginfo.txt
*.appx
*.appxbundle
*.appxupload
# Visual Studio cache files
# files ending in .cache can be ignored
*.[Cc]ache
# but keep track of directories ending in .cache
!?*.[Cc]ache/
# Others
ClientBin/
~$*
*~
*.dbmdl
*.dbproj.schemaview
*.jfm
*.pfx
*.publishsettings
orleans.codegen.cs
# Including strong name files can present a security risk
# (https://github.com/github/gitignore/pull/2483#issue-259490424)
#*.snk
# Since there are multiple workflows, uncomment next line to ignore bower_components
# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
#bower_components/
# RIA/Silverlight projects
Generated_Code/
# Backup & report files from converting an old project file
# to a newer Visual Studio version. Backup files are not needed,
# because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
ServiceFabricBackup/
*.rptproj.bak
# SQL Server files
*.mdf
*.ldf
*.ndf
# Business Intelligence projects
*.rdl.data
*.bim.layout
*.bim_*.settings
*.rptproj.rsuser
*- [Bb]ackup.rdl
*- [Bb]ackup ([0-9]).rdl
*- [Bb]ackup ([0-9][0-9]).rdl
# Microsoft Fakes
FakesAssemblies/
# GhostDoc plugin setting file
*.GhostDoc.xml
# Node.js Tools for Visual Studio
.ntvs_analysis.dat
node_modules/
# Visual Studio 6 build log
*.plg
# Visual Studio 6 workspace options file
*.opt
# Visual Studio 6 auto-generated workspace file (contains which files were open etc.)
*.vbw
# Visual Studio 6 auto-generated project file (contains which files were open etc.)
*.vbp
# Visual Studio 6 workspace and project file (working project files containing files to include in project)
*.dsw
*.dsp
# Visual Studio 6 technical files
*.ncb
*.aps
# Visual Studio LightSwitch build output
**/*.HTMLClient/GeneratedArtifacts
**/*.DesktopClient/GeneratedArtifacts
**/*.DesktopClient/ModelManifest.xml
**/*.Server/GeneratedArtifacts
**/*.Server/ModelManifest.xml
_Pvt_Extensions
# Paket dependency manager
**/.paket/paket.exe
paket-files/
# FAKE - F# Make
**/.fake/
# CodeRush personal settings
**/.cr/personal
# Python Tools for Visual Studio (PTVS)
**/__pycache__/
*.pyc
# Cake - Uncomment if you are using it
#tools/**
#!tools/packages.config
# Tabs Studio
*.tss
# Telerik's JustMock configuration file
*.jmconfig
# BizTalk build output
*.btp.cs
*.btm.cs
*.odx.cs
*.xsd.cs
# OpenCover UI analysis results
OpenCover/
# Azure Stream Analytics local run output
ASALocalRun/
# MSBuild Binary and Structured Log
*.binlog
MSBuild_Logs/
# AWS SAM Build and Temporary Artifacts folder
.aws-sam
# NVidia Nsight GPU debugger configuration file
*.nvuser
# MFractors (Xamarin productivity tool) working folder
**/.mfractor/
# Local History for Visual Studio
**/.localhistory/
# Visual Studio History (VSHistory) files
.vshistory/
# BeatPulse healthcheck temp database
healthchecksdb
# Backup folder for Package Reference Convert tool in Visual Studio 2017
MigrationBackup/
# Ionide (cross platform F# VS Code tools) working folder
**/.ionide/
# Fody - auto-generated XML schema
FodyWeavers.xsd
# VS Code files for those working on multiple tools
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets
# Local History for Visual Studio Code
.history/
# Built Visual Studio Code Extensions
*.vsix
# Windows Installer files from build outputs
*.cab
*.msi
*.msix
*.msm
*.msp

View File

@ -0,0 +1,18 @@
//{{NO_DEPENDENCIES}}
// Microsoft Visual C++ 生成的包含文件。
// 使用者 kaa-wrapper.rc
#define IDI_KAAWRAPPER 107
#define IDI_SMALL 108
// 新对象的下一组默认值
//
#ifdef APSTUDIO_INVOKED
#ifndef APSTUDIO_READONLY_SYMBOLS
#define _APS_NO_MFC 130
#define _APS_NEXT_RESOURCE_VALUE 129
#define _APS_NEXT_COMMAND_VALUE 32771
#define _APS_NEXT_CONTROL_VALUE 1000
#define _APS_NEXT_SYMED_VALUE 110
#endif
#endif

View File

@ -0,0 +1,10 @@
// header.h: 标准系统包含文件的包含文件,
// 或特定于项目的包含文件
//
#pragma once
#include "targetver.h"
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <tchar.h>

View File

@ -0,0 +1,88 @@
// kaa-wrapper.cpp : 定义应用程序的入口点。
//
#include "framework.h"
#include "kaa-wrapper.h"
#include <string>
int APIENTRY wWinMain(_In_ HINSTANCE hInstance,
_In_opt_ HINSTANCE hPrevInstance,
_In_ LPWSTR lpCmdLine,
_In_ int nCmdShow)
{
UNREFERENCED_PARAMETER(hInstance);
UNREFERENCED_PARAMETER(hPrevInstance);
UNREFERENCED_PARAMETER(nCmdShow);
// 设置当前目录为程序所在目录
WCHAR szPath[MAX_PATH];
if (GetModuleFileNameW(NULL, szPath, MAX_PATH) == 0) {
MessageBoxW(NULL, L"无法获取程序所在目录", L"错误", MB_OK | MB_ICONERROR);
return 1;
}
std::wstring path(szPath);
size_t pos = path.find_last_of(L"\\");
if (pos == std::wstring::npos) {
MessageBoxW(NULL, L"程序路径格式错误", L"错误", MB_OK | MB_ICONERROR);
return 1;
}
path = path.substr(0, pos);
if (!SetCurrentDirectoryW(path.c_str())) {
MessageBoxW(NULL, L"无法设置工作目录", L"错误", MB_OK | MB_ICONERROR);
return 1;
}
// 检查 Python 解释器是否存在
std::wstring pythonPath = path + L"\\WPy64-310111\\python-3.10.11.amd64\\python.exe";
if (GetFileAttributesW(pythonPath.c_str()) == INVALID_FILE_ATTRIBUTES) {
MessageBoxW(NULL, L"找不到 Python 解释器", L"错误", MB_OK | MB_ICONERROR);
return 1;
}
// 检查 bootstrap.pyz 是否存在
std::wstring bootstrapPath = path + L"\\bootstrap.pyz";
if (GetFileAttributesW(bootstrapPath.c_str()) == INVALID_FILE_ATTRIBUTES) {
MessageBoxW(NULL, L"找不到 bootstrap.pyz 文件", L"错误", MB_OK | MB_ICONERROR);
return 1;
}
// 构建命令行
std::wstring cmd = pythonPath + L" " + bootstrapPath;
// 如果有命令行参数,将其传递给 bootstrap
if (lpCmdLine && wcslen(lpCmdLine) > 0) {
cmd += L" ";
cmd += lpCmdLine;
}
// 启动信息
STARTUPINFOW si = { sizeof(si) };
PROCESS_INFORMATION pi;
// 创建进程,使用当前目录作为工作目录
if (!CreateProcessW(NULL,
const_cast<LPWSTR>(cmd.c_str()),
NULL,
NULL,
FALSE,
0,
NULL,
path.c_str(), // 设置工作目录为当前目录
&si,
&pi))
{
DWORD error = GetLastError();
WCHAR errorMsg[256];
swprintf_s(errorMsg, L"无法启动程序 (错误代码: %d)", error);
MessageBoxW(NULL, errorMsg, L"错误", MB_OK | MB_ICONERROR);
return 1;
}
// 关闭进程和线程句柄
CloseHandle(pi.hProcess);
CloseHandle(pi.hThread);
return 0;
}

View File

@ -0,0 +1,3 @@
#pragma once
#include "resource.h"

Binary file not shown.

After

Width:  |  Height:  |  Size: 264 KiB

Binary file not shown.

View File

@ -0,0 +1,31 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.10.35013.160
MinimumVisualStudioVersion = 10.0.40219.1
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "kaa-wrapper", "kaa-wrapper.vcxproj", "{F4F29940-A4DD-40C0-A433-1CF3C7B6F55C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
Debug|x86 = Debug|x86
Release|x64 = Release|x64
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{F4F29940-A4DD-40C0-A433-1CF3C7B6F55C}.Debug|x64.ActiveCfg = Debug|x64
{F4F29940-A4DD-40C0-A433-1CF3C7B6F55C}.Debug|x64.Build.0 = Debug|x64
{F4F29940-A4DD-40C0-A433-1CF3C7B6F55C}.Debug|x86.ActiveCfg = Debug|Win32
{F4F29940-A4DD-40C0-A433-1CF3C7B6F55C}.Debug|x86.Build.0 = Debug|Win32
{F4F29940-A4DD-40C0-A433-1CF3C7B6F55C}.Release|x64.ActiveCfg = Release|x64
{F4F29940-A4DD-40C0-A433-1CF3C7B6F55C}.Release|x64.Build.0 = Release|x64
{F4F29940-A4DD-40C0-A433-1CF3C7B6F55C}.Release|x86.ActiveCfg = Release|Win32
{F4F29940-A4DD-40C0-A433-1CF3C7B6F55C}.Release|x86.Build.0 = Release|Win32
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {304B08B9-3494-48C3-94A5-9486F9D16B10}
EndGlobalSection
EndGlobal

View File

@ -0,0 +1,148 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x64">
<Configuration>Release</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<VCProjectVersion>17.0</VCProjectVersion>
<Keyword>Win32Proj</Keyword>
<ProjectGuid>{f4f29940-a4dd-40c0-a433-1cf3c7b6f55c}</ProjectGuid>
<RootNamespace>kaawrapper</RootNamespace>
<WindowsTargetPlatformVersion>10.0</WindowsTargetPlatformVersion>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v143</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v143</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v143</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v143</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="Shared">
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>_DEBUG;_WINDOWS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>NDEBUG;_WINDOWS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClInclude Include="framework.h" />
<ClInclude Include="kaa-wrapper.h" />
<ClInclude Include="Resource.h" />
<ClInclude Include="targetver.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="kaa-wrapper.cpp" />
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="kaa-wrapper.rc" />
</ItemGroup>
<ItemGroup>
<Image Include="kaa-wrapper.ico" />
<Image Include="small.ico" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>

View File

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<Filter Include="源文件">
<UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier>
<Extensions>cpp;c;cc;cxx;c++;cppm;ixx;def;odl;idl;hpj;bat;asm;asmx</Extensions>
</Filter>
<Filter Include="头文件">
<UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier>
<Extensions>h;hh;hpp;hxx;h++;hm;inl;inc;ipp;xsd</Extensions>
</Filter>
<Filter Include="资源文件">
<UniqueIdentifier>{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}</UniqueIdentifier>
<Extensions>rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms</Extensions>
</Filter>
</ItemGroup>
<ItemGroup>
<ClInclude Include="framework.h">
<Filter>头文件</Filter>
</ClInclude>
<ClInclude Include="targetver.h">
<Filter>头文件</Filter>
</ClInclude>
<ClInclude Include="Resource.h">
<Filter>头文件</Filter>
</ClInclude>
<ClInclude Include="kaa-wrapper.h">
<Filter>头文件</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="kaa-wrapper.cpp">
<Filter>源文件</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="kaa-wrapper.rc">
<Filter>资源文件</Filter>
</ResourceCompile>
</ItemGroup>
<ItemGroup>
<Image Include="small.ico">
<Filter>资源文件</Filter>
</Image>
<Image Include="kaa-wrapper.ico">
<Filter>资源文件</Filter>
</Image>
</ItemGroup>
</Project>

Binary file not shown.

After

Width:  |  Height:  |  Size: 264 KiB

View File

@ -0,0 +1,6 @@
#pragma once
// // 包含 SDKDDKVer.h 可定义可用的最高版本的 Windows 平台。
// 如果希望为之前的 Windows 平台构建应用程序,在包含 SDKDDKVer.h 之前请先包含 WinSDKVer.h 并
// 将 _WIN32_WINNT 宏设置为想要支持的平台。
#include <SDKDDKVer.h>

View File

@ -1,41 +0,0 @@
@echo off
call WPy64-310111\scripts\env.bat
if errorlevel 1 (
goto ERROR
)
set PIP_EXTRA_INDEX_URL=https://mirrors.cloud.tencent.com/pypi/simple/ http://mirrors.aliyun.com/pypi/simple/
echo =========== 安装与更新 KAA ===========
:INSTALL
echo 检查 pip
python -m pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple --upgrade pip
if errorlevel 1 (
goto ERROR
)
pip config set global.trusted-host "pypi.org files.pythonhosted.org pypi.python.org mirrors.aliyun.com mirrors.cloud.tencent.com mirrors.tuna.tsinghua.edu.cn"
pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
echo 安装 ksaa
pip install --upgrade ksaa
if errorlevel 1 (
goto ERROR
)
echo =========== 当前版本 ===========
pip show ksaa
echo =========== 运行 KAA ===========
:RUN
set no_proxy=localhost, 127.0.0.1, ::1
kaa
if errorlevel 1 (
goto ERROR
)
echo =========== 运行结束 ===========
pause
exit /b 0
:ERROR
echo 发生错误,程序退出
pause
exit /b 1

View File

@ -1,54 +0,0 @@
@echo off
cd /D %~dp0
REM https://superuser.com/questions/788924/is-it-possible-to-automatically-run-a-batch-file-as-administrator
REM --> Check for permissions
>nul 2>&1 "%SYSTEMROOT%\system32\cacls.exe" "%SYSTEMROOT%\system32\config\system"
REM --> If error flag set, we do not have admin.
if '%errorlevel%' NEQ '0' (
echo 需要以管理员身份运行。右键此脚本,选择“以管理员身份运行”。
pause
exit /b 1
)
call WPy64-310111\scripts\env.bat
if errorlevel 1 (
goto ERROR
)
set PIP_EXTRA_INDEX_URL=https://mirrors.cloud.tencent.com/pypi/simple/ http://mirrors.aliyun.com/pypi/simple/
echo =========== 安装与更新 KAA ===========
:INSTALL
echo 检查 pip
python -m pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple --upgrade pip
if errorlevel 1 (
goto ERROR
)
pip config set global.trusted-host "pypi.org files.pythonhosted.org pypi.python.org mirrors.aliyun.com mirrors.cloud.tencent.com mirrors.tuna.tsinghua.edu.cn"
pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
echo 安装 ksaa
pip install --upgrade ksaa
if errorlevel 1 (
goto ERROR
)
echo =========== 当前版本 ===========
pip show ksaa
echo =========== 运行 KAA ===========
:RUN
set no_proxy=localhost, 127.0.0.1, ::1
kaa
if errorlevel 1 (
goto ERROR
)
echo =========== 运行结束 ===========
pause
exit /b 0
:ERROR
echo 发生错误,程序退出
pause
exit /b 1

View File

@ -1,40 +0,0 @@
@echo off
call WPy64-310111\scripts\env.bat
if errorlevel 1 (
goto ERROR
)
if "%~1"=="" (
echo 请将 Python 包文件拖到此脚本上
pause
exit /b 1
)
echo =========== 卸载原有包 ===========
pip uninstall -y ksaa
pip uninstall -y ksaa_res
if errorlevel 1 (
goto ERROR
)
:INSTALL_LOOP
if "%~1"=="" goto INSTALL_DONE
echo =========== 安装 %~1 ===========
pip install "%~1"
if errorlevel 1 (
goto ERROR
)
shift
goto INSTALL_LOOP
:INSTALL_DONE
echo =========== 安装完成 ===========
pause
exit /b 0
:ERROR
echo 发生错误,程序退出
pause
exit /b 1

View File

@ -41,10 +41,6 @@ env: fetch-submodule
}
python tools/make_resources.py
# Build the project using pyinstaller
build: env
pyinstaller -y kotonebot-gr.spec
generate-metadata: env
#!{{shebang_python}}
# 更新日志
@ -124,3 +120,21 @@ publish-test: package
#
build-bootstrap:
#!{{shebang_pwsh}}
echo "Building bootstrap..."
# 构建 Python
cd bootstrap
python -m zipapp kaa-bootstrap
mv kaa-bootstrap.pyz ../dist/bootstrap.pyz -fo
# 构建 C++
$msbuild = &"${env:ProgramFiles(x86)}\Microsoft Visual Studio\Installer\vswhere.exe" -latest -prerelease -products * -requires Microsoft.Component.MSBuild -find MSBuild\**\Bin\MSBuild.exe
if ($msbuild) {
& $msbuild kaa-wrapper/kaa-wrapper.sln /p:Configuration=Release
mv kaa-wrapper/x64/Release/kaa-wrapper.exe ../dist/kaa.exe -fo
} else {
Write-Host "MSBuild not found. Please install Visual Studio or build kaa-wrapper manually."
}
# Build kaa and bootstrap
build: package build-bootstrap

View File

@ -461,6 +461,21 @@ class EndGameConfig(ConfigBaseModel):
目前仅对 DMM 版有效
"""
class MiscConfig(ConfigBaseModel):
check_update: Literal['never', 'startup'] = 'startup'
"""
检查更新时机
* never: 从不检查更新
* startup: 启动时检查更新
"""
auto_install_update: bool = True
"""
是否自动安装更新
若启用则每次自动检查更新时若有新版本会自动安装否则只是会提示
"""
class BaseConfig(ConfigBaseModel):
purchase: PurchaseConfig = PurchaseConfig()
"""商店购买配置"""
@ -501,6 +516,9 @@ class BaseConfig(ConfigBaseModel):
end_game: EndGameConfig = EndGameConfig()
"""关闭游戏配置"""
misc: MiscConfig = MiscConfig()
"""杂项配置"""
def conf() -> BaseConfig:
"""获取当前配置数据"""

View File

@ -3,6 +3,9 @@ import traceback
import zipfile
import logging
import copy
import sys
import subprocess
import json
from functools import partial
from itertools import chain
from datetime import datetime, timedelta
@ -22,7 +25,7 @@ from kotonebot.kaa.common import (
BaseConfig, APShopItems, CapsuleToysConfig, ClubRewardConfig, PurchaseConfig, ActivityFundsConfig,
PresentsConfig, AssignmentConfig, ContestConfig, ProduceConfig,
MissionRewardConfig, DailyMoneyShopItems, ProduceAction,
RecommendCardDetectionMode, TraceConfig, StartGameConfig, EndGameConfig, UpgradeSupportCardConfig,
RecommendCardDetectionMode, TraceConfig, StartGameConfig, EndGameConfig, UpgradeSupportCardConfig, MiscConfig,
)
logger = logging.getLogger(__name__)
@ -88,6 +91,9 @@ ConfigKey = Literal[
'presents_enabled',
'trace_recommend_card_detection',
# misc
'check_update', 'auto_install_update',
'_selected_backend_index'
]
@ -1469,6 +1475,35 @@ class KotoneBotUI:
'trace_recommend_card_detection': trace_recommend_card_detection
}
def _create_misc_settings(self) -> ConfigBuilderReturnValue:
with gr.Column():
gr.Markdown("### 杂项设置")
check_update = gr.Dropdown(
choices=[
("启动时检查更新", "startup"),
("从不检查更新", "never")
],
value=self.current_config.options.misc.check_update,
label="检查更新时机",
info=MiscConfig.model_fields['check_update'].description,
interactive=True
)
auto_install_update = gr.Checkbox(
label="自动安装更新",
value=self.current_config.options.misc.auto_install_update,
info=MiscConfig.model_fields['auto_install_update'].description,
interactive=True
)
def set_config(config: BaseConfig, data: dict[ConfigKey, Any]) -> None:
config.misc.check_update = data['check_update']
config.misc.auto_install_update = data['auto_install_update']
return set_config, {
'check_update': check_update,
'auto_install_update': auto_install_update
}
def _create_settings_tab(self) -> None:
with gr.Tab("设置"):
gr.Markdown("## 设置")
@ -1506,6 +1541,9 @@ class KotoneBotUI:
# 跟踪设置
trace_settings = self._create_trace_settings()
# 杂项设置
misc_settings = self._create_misc_settings()
# 启动游戏设置
start_game_settings = self._create_start_game_settings()
@ -1529,7 +1567,8 @@ class KotoneBotUI:
capsule_toys_settings,
start_game_settings,
end_game_settings,
trace_settings
trace_settings,
misc_settings
] # list of (set_func, { 'key': component, ... })
all_components = [list(ret[1].values()) for ret in all_return_values] # [[c1, c2], [c3], ...]
all_components = list(chain(*all_components)) # [c1, c2, c3, ...]
@ -1571,10 +1610,224 @@ class KotoneBotUI:
)
def _create_whats_new_tab(self) -> None:
"""创建更新日志标签页,并显示最新版本更新内容"""
with gr.Tab("更新日志"):
from kotonebot.kaa.metadata import WHATS_NEW
gr.Markdown(WHATS_NEW)
"""创建更新标签页"""
with gr.Tab("更新"):
gr.Markdown("## 版本管理")
# 更新日志
with gr.Accordion("更新日志", open=False):
from kotonebot.kaa.metadata import WHATS_NEW
gr.Markdown(WHATS_NEW)
# 载入信息按钮
load_info_btn = gr.Button("载入信息", variant="primary")
# 状态信息
status_text = gr.Markdown("")
# 版本选择下拉框(用于安装)
version_dropdown = gr.Dropdown(
label="选择要安装的版本",
choices=[],
value=None,
visible=False,
interactive=True
)
# 安装选定版本按钮
install_selected_btn = gr.Button("安装选定版本", visible=False)
def list_all_versions():
"""列出所有可用版本"""
import logging
logger = logging.getLogger(__name__)
try:
# 构建命令,使用清华镜像源
cmd = [
sys.executable, "-m", "pip", "index", "versions", "ksaa", "--json",
"--index-url", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple",
"--trusted-host", "mirrors.tuna.tsinghua.edu.cn"
]
logger.info(f"执行命令: {' '.join(cmd)}")
# 使用 pip index versions --json 来获取版本信息
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
logger.info(f"命令返回码: {result.returncode}")
if result.stdout:
logger.info(f"命令输出: {result.stdout[:500]}...") # 只记录前500字符
if result.stderr:
logger.warning(f"命令错误输出: {result.stderr}")
if result.returncode != 0:
error_msg = f"获取版本列表失败: {result.stderr}"
logger.error(error_msg)
return (
error_msg,
gr.Button(value="载入信息", interactive=True),
gr.Dropdown(visible=False),
gr.Button(visible=False)
)
# 解析 JSON 输出
try:
data = json.loads(result.stdout)
versions = data.get("versions", [])
latest_version = data.get("latest", "")
installed_version = data.get("installed_version", "")
logger.info(f"解析到 {len(versions)} 个版本")
logger.info(f"最新版本: {latest_version}")
logger.info(f"已安装版本: {installed_version}")
except json.JSONDecodeError as e:
error_msg = f"解析版本信息失败: {str(e)}"
logger.error(error_msg)
return (
error_msg,
gr.Button(value="载入信息", interactive=True),
gr.Dropdown(visible=False),
gr.Button(visible=False)
)
if not versions:
error_msg = "未找到可用版本"
logger.warning(error_msg)
return (
error_msg,
gr.Button(value="载入信息", interactive=True),
gr.Dropdown(visible=False),
gr.Button(visible=False)
)
# 构建状态信息
status_info = []
if installed_version:
status_info.append(f"**当前安装版本:** {installed_version}")
if latest_version:
status_info.append(f"**最新版本:** {latest_version}")
status_info.append(f"**找到 {len(versions)} 个可用版本**")
status_message = "\n\n".join(status_info)
logger.info(f"版本信息载入完成: {status_message}")
# 返回更新后的组件
return (
status_message,
gr.Button(value="载入信息", interactive=True),
gr.Dropdown(choices=versions, value=versions[0] if versions else None, visible=True, label="选择要安装的版本"),
gr.Button(visible=True, value="安装选定版本")
)
except subprocess.TimeoutExpired:
error_msg = "获取版本列表超时"
logger.error(error_msg)
return (
error_msg,
gr.Button(value="载入信息", interactive=True),
gr.Dropdown(visible=False),
gr.Button(visible=False)
)
except Exception as e:
error_msg = f"获取版本列表失败: {str(e)}"
logger.error(error_msg)
return (
error_msg,
gr.Button(value="载入信息", interactive=True),
gr.Dropdown(visible=False),
gr.Button(visible=False)
)
def install_selected_version(selected_version: str):
"""安装选定的版本"""
import logging
import threading
import time
logger = logging.getLogger(__name__)
if not selected_version:
error_msg = "请先选择一个版本"
logger.warning(error_msg)
return error_msg
def install_and_exit():
"""在后台线程中执行安装并退出程序"""
try:
# 等待一小段时间确保UI响应已返回
time.sleep(1)
# 构建启动器命令
bootstrap_path = os.path.join(os.getcwd(), "bootstrap.pyz")
cmd = [sys.executable, bootstrap_path, f"--install-version={selected_version}"]
logger.info(f"开始通过启动器安装版本 {selected_version}")
logger.info(f"执行命令: {' '.join(cmd)}")
# 启动启动器进程(不等待完成)
subprocess.Popen(
cmd,
cwd=os.getcwd(),
creationflags=subprocess.CREATE_NEW_CONSOLE if os.name == 'nt' else 0
)
# 等待一小段时间确保启动器启动
time.sleep(2)
# 退出当前程序
logger.info("安装即将开始,正在退出当前程序...")
os._exit(0)
except Exception as e:
raise
try:
# 在后台线程中执行安装和退出
install_thread = threading.Thread(target=install_and_exit, daemon=True)
install_thread.start()
return f"正在启动器中安装版本 {selected_version},程序将自动重启..."
except Exception as e:
error_msg = f"启动安装进程失败: {str(e)}"
logger.error(error_msg)
return error_msg
def load_info_with_button_state():
"""载入信息并管理按钮状态"""
import logging
logger = logging.getLogger(__name__)
logger.info("开始载入版本信息")
# 先禁用按钮
yield (
"正在载入版本信息...",
gr.Button(value="载入中...", interactive=False),
gr.Dropdown(visible=False),
gr.Button(visible=False)
)
# 执行载入操作
result = list_all_versions()
logger.info("版本信息载入操作完成")
yield result
# 绑定事件
load_info_btn.click(
fn=load_info_with_button_state,
outputs=[status_text, load_info_btn, version_dropdown, install_selected_btn]
)
install_selected_btn.click(
fn=install_selected_version,
inputs=[version_dropdown],
outputs=[status_text]
)
def _create_screen_tab(self) -> None:
with gr.Tab("画面"):