feat(bootstrap): 新启动器

This commit is contained in:
XcantloadX 2025-06-29 16:47:16 +08:00
parent 02860b6014
commit c4b93f40d6
8 changed files with 1176 additions and 95 deletions

View File

@ -0,0 +1,7 @@
from terminal import print_status
from launcher import main_launch
try:
main_launch()
except KeyboardInterrupt:
print_status("运行结束", status='info')

View File

@ -0,0 +1,502 @@
import importlib.metadata
import json
import logging
import os
import sys
import subprocess
import ctypes
import codecs
import locale
from typing import Optional, Dict, Any
from pathlib import Path
from collections import deque
from datetime import datetime
from request import head, HTTPError, NetworkError
from terminal import (
Color, print_header, print_status, clear_screen,
get_terminal_width, get_display_width, truncate_string,
hide_cursor, show_cursor, move_cursor_up, wait_key, get_terminal_height
)
# 获取当前Python解释器路径
python_executable = sys.executable
def setup_logging():
"""
配置日志记录
"""
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%y-%m-%d-%H-%M-%S")
log_file = log_dir / f"bootstrap-{timestamp}.log"
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d] %(message)s',
filename=log_file,
filemode='w',
encoding='utf-8'
)
# 记录未捕获的异常
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logging.error("未捕获的异常", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
logging.info("日志记录器已初始化。")
PIP_SERVERS = [
"https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple",
"https://mirrors.aliyun.com/pypi/simple",
"https://mirrors.cloud.tencent.com/pypi/simple",
"https://pypi.org/simple",
]
def is_admin() -> bool:
"""
检查当前进程是否具有管理员权限
:return: 如果具有管理员权限返回True否则返回False
:rtype: bool
"""
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
def test_url_availability(url: str) -> bool:
"""
测试URL是否可访问返回200状态码
:param url: 要测试的URL
:type url: str
:return: 如果URL可访问返回True否则返回False
:rtype: bool
"""
try:
with head(url, timeout=10) as response:
return response.status_code == 200
except (HTTPError, NetworkError):
return False
except Exception:
return False
def get_working_pip_server() -> Optional[str]:
"""
获取可用的pip服务器
:return: 第一个可用的pip服务器URL如果都不可用返回None
:rtype: Optional[str]
"""
for server in PIP_SERVERS:
msg = f"正在测试: {server}"
print_status(msg, status='info', indent=1)
logging.info(msg)
if test_url_availability(server):
msg = f"找到可用的pip服务器: {server}"
print_status(msg, status='success', indent=1)
logging.info(msg)
return server
msg = "所有pip服务器都不可用"
print_status(msg, status='error')
logging.error(msg)
return None
def package_version(package_name: str) -> Optional[str]:
"""
获取指定包的版本信息
:param package_name: 包名称
:type package_name: str
:return: 包版本字符串如果包不存在则返回 None
:rtype: Optional[str]
:Example:
.. code-block:: python
>>> package_version("requests")
'2.31.0'
>>> package_version("nonexistent_package")
None
:raises: 无异常抛出包不存在时返回 None
"""
try:
return importlib.metadata.version(package_name)
except importlib.metadata.PackageNotFoundError:
return None
def run_command(command: str, check: bool = True, verbatim: bool = False, scroll_region_size: int = -1, log_output: bool = True) -> bool:
"""
运行命令并实时输出返回是否成功
:param command: 要运行的命令
:param check: 是否检查返回码
:param verbatim: 是否原样输出不使用滚动UI
:param scroll_region_size: 滚动区域的大小, -1 表示动态计算
:param log_output: 是否将命令输出记录到日志中
:return: 命令是否成功执行
"""
logging.info(f"执行命令: {command}")
# 设置环境变量以确保正确的编码处理
env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
# 获取系统默认编码
system_encoding = locale.getpreferredencoding()
# 创建解码器
def decode_output(line: bytes) -> str:
try:
# 首先尝试UTF-8解码
return line.decode('utf-8')
except UnicodeDecodeError:
try:
# 如果UTF-8失败尝试系统默认编码
return line.decode(system_encoding)
except UnicodeDecodeError:
# 如果都失败了,使用'replace'策略
return line.decode('utf-8', errors='replace')
if verbatim:
print(f"▶ 执行命令: {command}")
try:
process = subprocess.Popen(
command, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
env=env
)
if process.stdout:
for line in iter(process.stdout.readline, b''):
clean_line = decode_output(line).strip('\n')
print(clean_line)
if log_output:
logging.info(clean_line)
returncode = process.wait()
logging.info(f"命令执行完毕,返回码: {returncode}")
return returncode == 0
except FileNotFoundError:
msg = f"命令未找到: {command.split()[0]}"
print_status(msg, status='error', indent=1)
logging.error(msg)
return False
except Exception as e:
msg = f"命令执行时发生错误: {e}"
print_status(msg, status='error', indent=1)
logging.error(msg, exc_info=True)
return False
# --- 滚动UI模式 ---
if scroll_region_size == -1:
# Heuristic: leave some lines for context above and below.
# Use at least 5 lines.
SCROLL_REGION_SIZE = max(5, get_terminal_height() - 8)
else:
SCROLL_REGION_SIZE = scroll_region_size
terminal_width = get_terminal_width()
# 打印初始状态行
prefix = ""
prefix_width = 2 # "▶ "
available_width = terminal_width - prefix_width
command_text = f"执行命令: {command}"
truncated_command = truncate_string(command_text, available_width)
padding_len = available_width - get_display_width(truncated_command)
padding = ' ' * max(0, padding_len)
print(f"{Color.GRAY}{prefix}{truncated_command}{padding}{Color.RESET}")
process = subprocess.Popen(
command, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
env=env
)
output_buffer = deque(maxlen=SCROLL_REGION_SIZE)
lines_rendered = 0
hide_cursor()
try:
if process.stdout:
for line in iter(process.stdout.readline, b''):
stripped_line = decode_output(line).strip()
output_buffer.append(stripped_line)
if log_output:
logging.info(stripped_line)
# 移动光标到绘制区域顶部
if lines_rendered > 0:
move_cursor_up(lines_rendered)
lines_rendered = min(len(output_buffer), SCROLL_REGION_SIZE)
# 重新绘制滚动区域
lines_to_render = list(output_buffer)
for i in range(lines_rendered):
line_to_print = lines_to_render[i] if i < len(lines_to_render) else ""
prefix = f"{Color.GRAY}|{Color.RESET} "
prefix_width = 2
available_width = terminal_width - prefix_width
truncated = truncate_string(line_to_print, available_width)
padding_len = available_width - get_display_width(truncated)
padding = ' ' * max(0, padding_len)
# 使用 \r 和 \n 刷新行
print(f"\r{prefix}{truncated}{padding}")
returncode = process.wait()
logging.info(f"命令执行完毕,返回码: {returncode}")
finally:
show_cursor()
# 清理滚动区域
if lines_rendered > 0:
move_cursor_up(lines_rendered)
for _ in range(lines_rendered):
print(' ' * terminal_width)
move_cursor_up(lines_rendered)
# 更新最终状态行
move_cursor_up(1)
if returncode == 0:
final_symbol = f"{Color.GREEN}"
success = True
else:
final_symbol = f"{Color.RED}"
success = False
# 重新计算填充以确保行被完全覆盖
final_prefix = f"{final_symbol} "
final_prefix_width = 2 # "✓ " or "✗ "
available_width = terminal_width - final_prefix_width
final_line_text = f"执行命令: {command}"
truncated_final_line = truncate_string(final_line_text, available_width)
padding_len = available_width - get_display_width(truncated_final_line)
padding = ' ' * max(0, padding_len)
print(f"\r{final_prefix}{truncated_final_line}{Color.RESET}{padding}")
if check and not success:
msg = f"命令执行失败,返回码: {returncode}"
print_status(msg, status='error', indent=1)
logging.error(msg)
return False
return success
def install_pip_and_ksaa(pip_server: str) -> bool:
"""
安装和更新pip以及ksaa包
:param pip_server: pip服务器URL
:type pip_server: str
:return: 安装是否成功
:rtype: bool
"""
print_header("安装与更新依赖", color=Color.BLUE)
# 定义信任的主机列表
trusted_hosts = "pypi.org files.pythonhosted.org pypi.python.org mirrors.aliyun.com mirrors.cloud.tencent.com mirrors.tuna.tsinghua.edu.cn"
# 升级pip
print_status("检查并更新 pip", status='info')
upgrade_pip_command = f'"{python_executable}" -m pip install -i {pip_server} --trusted-host "{trusted_hosts}" --upgrade pip'
if not run_command(upgrade_pip_command):
return False
# 安装ksaa通过命令行参数传递配置
print_status("安装或更新 ksaa", status='info')
install_command = f'"{python_executable}" -m pip install --upgrade --index-url {pip_server} --trusted-host "{trusted_hosts}" ksaa'
if not run_command(install_command):
return False
return True
def load_config() -> Optional[Dict[str, Any]]:
"""
加载config.json配置文件
:return: 配置字典如果加载失败返回None
:rtype: Optional[Dict[str, Any]]
"""
config_path = Path("./config.json")
if not config_path.exists():
msg = f"配置文件 config.json 不存在,跳过配置加载"
print_status(msg, status='warning')
logging.warning(msg)
return None
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
msg = "成功加载配置文件"
print_status(msg, status='success')
logging.info(msg)
return config
except Exception as e:
msg = f"加载配置文件失败: {e}"
print_status(msg, status='error')
logging.error(msg, exc_info=True)
return None
def restart_as_admin() -> None:
"""
以管理员身份重启程序
"""
if is_admin():
return
script = os.path.abspath(sys.argv[0])
params = ' '.join([f'"{item}"' for item in sys.argv[1:]])
try:
# 使用 ShellExecute 以管理员身份启动程序
ret = ctypes.windll.shell32.ShellExecuteW(
None, "runas", python_executable, f'"{script}" {params}', None, 1
)
if ret > 32: # 返回值大于32表示成功
msg = "正在以管理员身份重启程序..."
print_status(msg, status='info')
logging.info(msg)
os._exit(0)
else:
msg = f"以管理员身份重启失败,错误码: {ret}"
print_status(msg, status='error')
logging.error(msg)
return
except Exception as e:
msg = f"以管理员身份重启时发生错误: {e}"
print_status(msg, status='error')
logging.error(msg, exc_info=True)
return
def check_admin(config: Dict[str, Any]) -> bool:
"""
检查Windows截图权限管理员权限
:param config: 配置字典
:type config: Dict[str, Any]
:return: 权限检查是否通过
:rtype: bool
"""
# 检查是否有用户配置
if not config.get("user_configs"):
msg = "配置文件中没有用户配置"
print_status(msg, status='warning')
logging.warning(msg)
return True # Not a fatal error, allow to continue
# 检查第一个用户配置的截图方式
first_config = config["user_configs"][0]
backend = first_config.get("backend", {})
screenshot_impl = backend.get("screenshot_impl")
if screenshot_impl == "windows":
msg = "检测到Windows截图模式检查管理员权限..."
print_status(msg, status='info')
logging.info(msg)
if not is_admin():
msg1 = "需要管理员权限才能使用Windows截图模式"
print_status(msg1, status='error')
logging.error(msg1)
# 尝试以管理员身份重启
msg2 = "正在尝试以管理员身份重启..."
print_status(msg2, status='info', indent=1)
logging.info(msg2)
restart_as_admin()
return False
else:
msg = "管理员权限检查通过"
print_status(msg, status='success')
logging.info(msg)
return True
def run_kaa() -> bool:
"""
运行KAA程序
:return: 运行是否成功
:rtype: bool
"""
print_header("运行 KAA", color=Color.GREEN)
clear_screen()
# 设置环境变量
os.environ["no_proxy"] = "localhost, 127.0.0.1, ::1"
# 运行kaa命令
if not run_command(f'"{python_executable}" -m kotonebot.kaa.main.cli', verbatim=True, log_output=False):
return False
print_header("运行结束", color=Color.GREEN)
return True
def main_launch():
"""
主启动函数执行完整的安装和启动流程
"""
setup_logging()
run_command("title 琴音小助手(运行时请勿关闭此窗口)", verbatim=True, log_output=False)
clear_screen()
print_header("琴音小助手启动器")
logging.info("启动器已启动。")
try:
# 1. 获取可用的pip服务器
print_status("正在寻找最快的 PyPI 镜像源...", status='info')
logging.info("正在寻找最快的 PyPI 镜像源...")
pip_server = get_working_pip_server()
if not pip_server:
raise RuntimeError("没有找到可用的pip服务器请检查网络连接。")
# 2. 安装和更新pip以及ksaa包
if not install_pip_and_ksaa(pip_server):
raise RuntimeError("依赖安装失败,请检查上面的错误日志。")
# 3. 加载配置文件
print_header("加载配置", color=Color.BLUE)
logging.info("加载配置。")
config = load_config()
if config:
# 4. 检查Windows截图权限
if not check_admin(config):
raise RuntimeError("权限检查失败。")
# 5. 运行KAA
if not run_kaa():
raise RuntimeError("KAA 主程序运行失败。")
msg = "KAA 已成功运行并退出。"
print_status(msg, status='success')
logging.info(msg)
except Exception as e:
msg = f"发生致命错误: {e}"
print_status(msg, status='error')
print_status("压缩 kaa 目录下的 logs 文件夹并给此窗口截图后一并发送给开发者", status='error')
logging.critical(msg, exc_info=True)
finally:
logging.info("启动器运行结束。")
wait_key("\n按任意键退出...")
if __name__ == "__main__":
try:
main_launch()
except KeyboardInterrupt:
print_status("运行结束", status='info')

View File

@ -0,0 +1,233 @@
import re
import html.parser
import urllib.parse
from typing import List
from dataclasses import dataclass
from request import get, HTTPError, NetworkError
@dataclass
class Version:
"""版本信息"""
version_str: str
major: int = 0
minor: int = 0
patch: int = 0
prerelease: str = ""
prerelease_num: int = 0
def __post_init__(self):
"""初始化后解析版本号"""
self._parse_version()
def _parse_version(self):
"""解析版本号字符串"""
version_str = self.version_str.lower()
# 基本版本号匹配 (如 1.2.3, 1.2, 1)
version_match = re.match(r'^(\d+)(?:\.(\d+))?(?:\.(\d+))?', version_str)
if version_match:
self.major = int(version_match.group(1))
self.minor = int(version_match.group(2)) if version_match.group(2) else 0
self.patch = int(version_match.group(3)) if version_match.group(3) else 0
# 预发布版本匹配 (如 alpha1, beta2, rc3)
prerelease_match = re.search(r'(alpha|beta|rc|dev|pre|post)(\d*)', version_str)
if prerelease_match:
self.prerelease = prerelease_match.group(1)
self.prerelease_num = int(prerelease_match.group(2)) if prerelease_match.group(2) else 0
def __lt__(self, other):
"""版本比较"""
if not isinstance(other, Version):
return NotImplemented
# 比较主版本号
if self.major != other.major:
return self.major < other.major
if self.minor != other.minor:
return self.minor < other.minor
if self.patch != other.patch:
return self.patch < other.patch
# 比较预发布版本
prerelease_order = {'': 4, 'rc': 3, 'beta': 2, 'alpha': 1, 'dev': 0, 'pre': 0, 'post': 5}
self_order = prerelease_order.get(self.prerelease, 0)
other_order = prerelease_order.get(other.prerelease, 0)
if self_order != other_order:
return self_order < other_order
# 同类型预发布版本比较数字
if self.prerelease == other.prerelease:
return self.prerelease_num < other.prerelease_num
return False
def __eq__(self, other):
"""版本相等比较"""
if not isinstance(other, Version):
return NotImplemented
return (self.major == other.major and
self.minor == other.minor and
self.patch == other.patch and
self.prerelease == other.prerelease and
self.prerelease_num == other.prerelease_num)
def __repr__(self):
return f"Version('{self.version_str}')"
@dataclass
class PackageVersion:
"""包版本信息"""
version: Version
url: str
class PyPIHTMLParser(html.parser.HTMLParser):
"""解析PyPI HTML响应的解析器"""
def __init__(self):
super().__init__()
self.links = []
self.current_href = None
self.current_text = None
def handle_starttag(self, tag, attrs):
if tag == 'a':
# 提取href属性
for attr_name, attr_value in attrs:
if attr_name == 'href':
self.current_href = attr_value
break
def handle_data(self, data):
if self.current_href:
self.current_text = data.strip()
def handle_endtag(self, tag):
if tag == 'a' and self.current_href and self.current_text:
self.links.append((self.current_text, self.current_href))
self.current_href = None
self.current_text = None
def normalize_package_name(package_name: str) -> str:
"""
标准化包名 _, -, . 字符视为相等
"""
return re.sub(r'[_.-]', '-', package_name.lower())
def extract_version_from_filename(filename: str) -> str:
"""
从文件名中提取版本号
例如: beautifulsoup4-4.13.0b2-py3-none-any.whl -> 4.13.0b2
"""
# 匹配版本号模式
version_pattern = r'^[^-]+-([^-]+?)(?:-py\d+)?(?:-none-any)?\.(?:whl|tar\.gz|zip)$'
match = re.match(version_pattern, filename)
if match:
return match.group(1)
# 备用模式:查找版本号
version_match = re.search(r'-(\d+\.\d+(?:\.\d+)?(?:[a-zA-Z0-9]*))', filename)
if version_match:
return version_match.group(1)
return "unknown"
def list_versions(package_name: str, *, server_url: str | None = None) -> List[PackageVersion]:
"""
获取指定包的所有可用版本按版本号降序排列
:param package_name: 包名
:type package_name: str
:param server_url: 可选的服务器URL默认为None时使用PyPI官方服务器https://pypi.org/simple
:type server_url: str | None
:return: 包含版本信息的列表按版本号降序排列
:rtype: List[PackageVersion]
:raises HTTPError: 当包不存在或网络错误时
:raises NetworkError: 当网络连接错误时
"""
# 标准化包名
normalized_name = normalize_package_name(package_name)
# 构建API URL
if server_url is None:
base_url = "https://pypi.org/simple"
else:
base_url = server_url.rstrip('/')
url = f"{base_url}/{urllib.parse.quote(normalized_name)}/"
# 设置请求头
headers = {
'Accept': 'application/vnd.pypi.simple.v1+html'
}
try:
# 发送请求
html_content = get(url, headers=headers).decode('utf-8')
# 解析HTML
parser = PyPIHTMLParser()
parser.feed(html_content)
# 处理链接并提取版本信息
versions = []
for filename, href in parser.links:
# 提取版本号
version_str = extract_version_from_filename(filename)
# 创建Version对象
version = Version(version_str)
# 创建PackageVersion对象
package_version = PackageVersion(
version=version,
url=href
)
versions.append(package_version)
# 按版本号降序排列
versions.sort(key=lambda x: x.version, reverse=True)
return versions
except HTTPError as e:
if e.code == 404:
raise ValueError(f"'{package_name}' 不存在") from e
else:
raise
def main():
"""测试函数"""
try:
# 测试获取beautifulsoup4的版本
print("获取 beautifulsoup4 的版本信息...")
versions = list_versions("beautifulsoup4")
print(f"找到 {len(versions)} 个版本:")
for i, pkg_version in enumerate(versions[:10], 1): # 只显示前10个
print(f"{i}. 版本: {pkg_version.version.version_str}")
print(f" 主版本: {pkg_version.version.major}.{pkg_version.version.minor}.{pkg_version.version.patch}")
if pkg_version.version.prerelease:
print(f" 预发布: {pkg_version.version.prerelease}{pkg_version.version.prerelease_num}")
print(f" URL: {pkg_version.url}")
print()
if len(versions) > 10:
print(f"... 还有 {len(versions) - 10} 个版本")
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,248 @@
import os
import urllib.parse
import urllib.error
import urllib.request
from typing import Dict, Any, Optional, Callable, TYPE_CHECKING
if TYPE_CHECKING:
from http.client import HTTPResponse
class HTTPError(Exception):
"""HTTP请求错误"""
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(f"HTTP {code}: {message}")
class NetworkError(Exception):
"""网络连接错误"""
pass
class Response:
"""HTTP响应封装"""
def __init__(self, http_response: "HTTPResponse"):
self._response = http_response
self._content: Optional[bytes] = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def close(self):
"""关闭响应和底层连接。"""
self._response.close()
@property
def status_code(self) -> int:
"""响应状态码"""
return self._response.getcode()
@property
def reason(self) -> str:
"""响应原因短语"""
return self._response.reason
@property
def headers(self) -> Dict[str, Any]:
"""响应头"""
return dict(self._response.headers)
def read(self) -> bytes:
"""读取响应内容 (bytes)"""
if self._content is None:
self._content = self._response.read()
return self._content
def json(self) -> Any:
"""将响应内容解析为JSON"""
import json
return json.loads(self.read())
def request(
url: str,
method: str = "GET",
headers: Optional[Dict[str, str]] = None,
data: Optional[bytes] = None,
timeout: Optional[float] = None
) -> Response:
"""
发送HTTP请求
:param url: 请求URL
:type url: str
:param method: HTTP方法默认为GET
:type method: str
:param headers: 请求头
:type headers: Optional[Dict[str, str]]
:param data: 请求数据
:type data: Optional[bytes]
:param timeout: 超时时间
:type timeout: Optional[float]
:return: 响应对象
:rtype: Response
:raises HTTPError: HTTP错误
:raises NetworkError: 网络连接错误
"""
# 设置默认请求头
default_headers = {
'User-Agent': 'Python-urllib/3.10'
}
if headers:
default_headers.update(headers)
# 创建请求
req = urllib.request.Request(url, data=data, headers=default_headers, method=method)
try:
# 发送请求
response = urllib.request.urlopen(req, timeout=timeout)
return Response(response)
except urllib.error.HTTPError as e:
raise HTTPError(e.code, e.reason) from e
except urllib.error.URLError as e:
raise NetworkError(f"网络连接错误: {e.reason}") from e
def get(url: str, headers: Optional[Dict[str, str]] = None, timeout: Optional[float] = None) -> bytes:
"""
发送GET请求
:param url: 请求URL
:type url: str
:param headers: 请求头
:type headers: Optional[Dict[str, str]]
:param timeout: 超时时间
:type timeout: Optional[float]
:return: 响应内容
:rtype: bytes
"""
with request(url, method="GET", headers=headers, timeout=timeout) as response:
return response.read()
def head(url: str, headers: Optional[Dict[str, str]] = None, timeout: Optional[float] = None) -> Response:
"""
发送HEAD请求
:param url: 请求URL
:type url: str
:param headers: 请求头
:type headers: Optional[Dict[str, str]]
:param timeout: 超时时间
:type timeout: Optional[float]
:return: 响应对象
:rtype: Response
:raises HTTPError: HTTP错误
:raises NetworkError: 网络连接错误
"""
return request(url, method="HEAD", headers=headers, timeout=timeout)
def download_file(
url: str,
dst_path: str,
*,
callback: Optional[Callable[[int, int], None]] = None,
timeout: Optional[float] = None
) -> None:
"""
下载文件
:param url: 文件URL
:type url: str
:param dst_path: 目标路径
:type dst_path: str
:param callback: 进度回调函数参数为(已下载字节数, 总字节数)
:type callback: Optional[Callable[[int, int], None]]
:param timeout: 超时时间
:type timeout: Optional[float]
:raises HTTPError: HTTP错误
:raises NetworkError: 网络连接错误
"""
# 设置请求头
headers = {
'User-Agent': 'Python-urllib/3.10'
}
# 创建请求
req = urllib.request.Request(url, headers=headers)
try:
# 发送请求
with urllib.request.urlopen(req, timeout=timeout) as response:
# 获取文件大小
content_length = response.headers.get('Content-Length')
total_size = int(content_length) if content_length else None
# 确保目标目录存在
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
# 下载文件
downloaded_size = 0
with open(dst_path, 'wb') as f:
while True:
chunk = response.read(8192) # 8KB chunks
if not chunk:
break
f.write(chunk)
downloaded_size += len(chunk)
# 调用进度回调
if callback and total_size:
callback(downloaded_size, total_size)
elif callback and not total_size:
callback(downloaded_size, -1) # 未知总大小
except urllib.error.HTTPError as e:
raise HTTPError(e.code, e.reason) from e
except urllib.error.URLError as e:
raise NetworkError(f"网络连接错误: {e.reason}") from e
except OSError as e:
raise NetworkError(f"文件写入错误: {e}") from e
def main():
"""测试函数"""
try:
# 测试GET请求
print("测试GET请求...")
response_bytes = get("https://httpbin.org/get")
print(f"响应长度: {len(response_bytes)} 字节")
# 测试文件下载
print("\n测试文件下载...")
def progress_callback(downloaded: int, total: int):
if total > 0:
percentage = (downloaded / total) * 100
print(f"下载进度: {downloaded}/{total} 字节 ({percentage:.1f}%)")
else:
print(f"已下载: {downloaded} 字节")
# 下载一个小文件进行测试
download_file(
"https://httpbin.org/bytes/1024",
"test_download.bin",
callback=progress_callback
)
print("下载完成!")
# 清理测试文件
if os.path.exists("test_download.bin"):
os.remove("test_download.bin")
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,178 @@
import os
import ctypes
import shutil
import unicodedata
from typing import Optional
def _enable_windows_ansi():
"""
On Windows, attempts to enable ANSI escape sequence processing.
"""
if os.name == 'nt':
try:
kernel32 = ctypes.windll.kernel32
handle = kernel32.GetStdHandle(-11) # STD_OUTPUT_HANDLE
mode = ctypes.c_ulong()
if kernel32.GetConsoleMode(handle, ctypes.byref(mode)) == 0:
return # Failed to get console mode
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0:
mode.value |= ENABLE_VIRTUAL_TERMINAL_PROCESSING
if kernel32.SetConsoleMode(handle, mode) == 0:
# Fallback for older systems
os.system('')
except Exception:
# Fallback for environments where ctypes fails (e.g., some IDE terminals)
os.system('')
_enable_windows_ansi()
class Color:
"""ANSI color codes"""
RESET = '\033[0m'
BOLD = '\033[1m'
BLACK = '\033[30m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
GRAY = '\033[90m'
def get_terminal_width(default=80):
"""Gets the width of the terminal."""
# shutil.get_terminal_size is a high-level and more robust way
return shutil.get_terminal_size((default, 24)).columns
def get_terminal_height(default=24):
"""Gets the height of the terminal."""
return shutil.get_terminal_size((default, 24)).lines
def get_display_width(s: str) -> int:
"""Calculates the display width of a string, accounting for wide characters."""
width = 0
for char in s:
# 'F' (Fullwidth), 'W' (Wide) characters take up 2 columns.
if unicodedata.east_asian_width(char) in ('F', 'W'):
width += 2
else:
width += 1
return width
def truncate_string(s: str, max_width: int) -> str:
"""Truncates a string to a maximum display width, handling wide characters."""
if not s or max_width <= 0:
return ""
width = 0
end_pos = 0
for i, char in enumerate(s):
# 'F' (Fullwidth), 'W' (Wide) characters take up 2 columns.
char_width = 2 if unicodedata.east_asian_width(char) in ('F', 'W') else 1
if width + char_width > max_width:
break
width += char_width
end_pos = i + 1
return s[:end_pos]
def hide_cursor():
"""Hides the terminal cursor."""
print('\033[?25l', end='')
def show_cursor():
"""Shows the terminal cursor."""
print('\033[?25h', end='')
def move_cursor_up(lines: int):
"""Moves the cursor up by a number of lines."""
if lines > 0:
print(f'\033[{lines}A', end='')
def clear_screen():
"""Clears the terminal screen."""
os.system('cls' if os.name == 'nt' else 'clear')
def print_header(text: str, color: str = Color.CYAN):
"""
Prints a centered header with separators that fill the terminal width.
Accounts for CJK character widths.
:param text: The text to display in the header.
:param color: ANSI color code for the header text.
"""
width = get_terminal_width()
padded_text = f" {text} "
text_display_width = get_display_width(padded_text)
# Handle cases where the text is wider than the terminal
if text_display_width >= width:
print(f"\n{Color.BOLD}{color}{text}{Color.RESET}")
return
separator_total_len = width - text_display_width
l_separator_len = separator_total_len // 2
r_separator_len = separator_total_len - l_separator_len
l_separator = "" * l_separator_len
r_separator = "" * r_separator_len
print(f"\n{Color.BOLD}{color}{l_separator}{padded_text}{r_separator}{Color.RESET}")
def print_status(message: str, success: Optional[bool] = None, status: str = 'info', indent: int = 0):
"""
Prints a status message with a symbol and color.
:param message: The status message to print.
:param success: (Deprecated) If True, sets status to 'success'; if False, 'error'.
:param status: 'success', 'error', 'warning', or 'info'.
:param indent: Number of spaces to indent.
"""
prefix = " " * indent
# Backward compatibility
if success is not None:
status = 'success' if success else 'error'
if status == 'success':
symbol = f"{Color.GREEN}"
elif status == 'error':
symbol = f"{Color.RED}"
elif status == 'warning':
symbol = f"{Color.YELLOW}"
else: # 'info'
symbol = f"{Color.BLUE}{Color.RESET}"
print(f"{prefix}[{symbol}] {message}{Color.RESET}")
def wait_key(message: str = ""):
"""
Prints a message and waits for a single key press from the user.
This is a cross-platform function.
:param message: The message to display before waiting.
"""
print(message, end="", flush=True)
if os.name == 'nt':
import msvcrt
msvcrt.getch()
else:
import sys
import tty
import termios
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
# Add a newline after the key is pressed for cleaner output
print()

View File

@ -1,41 +0,0 @@
@echo off
call WPy64-310111\scripts\env.bat
if errorlevel 1 (
goto ERROR
)
set PIP_EXTRA_INDEX_URL=https://mirrors.cloud.tencent.com/pypi/simple/ http://mirrors.aliyun.com/pypi/simple/
echo =========== 安装与更新 KAA ===========
:INSTALL
echo 检查 pip
python -m pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple --upgrade pip
if errorlevel 1 (
goto ERROR
)
pip config set global.trusted-host "pypi.org files.pythonhosted.org pypi.python.org mirrors.aliyun.com mirrors.cloud.tencent.com mirrors.tuna.tsinghua.edu.cn"
pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
echo 安装 ksaa
pip install --upgrade ksaa
if errorlevel 1 (
goto ERROR
)
echo =========== 当前版本 ===========
pip show ksaa
echo =========== 运行 KAA ===========
:RUN
set no_proxy=localhost, 127.0.0.1, ::1
kaa
if errorlevel 1 (
goto ERROR
)
echo =========== 运行结束 ===========
pause
exit /b 0
:ERROR
echo 发生错误,程序退出
pause
exit /b 1

View File

@ -1,54 +0,0 @@
@echo off
cd /D %~dp0
REM https://superuser.com/questions/788924/is-it-possible-to-automatically-run-a-batch-file-as-administrator
REM --> Check for permissions
>nul 2>&1 "%SYSTEMROOT%\system32\cacls.exe" "%SYSTEMROOT%\system32\config\system"
REM --> If error flag set, we do not have admin.
if '%errorlevel%' NEQ '0' (
echo 需要以管理员身份运行。右键此脚本,选择“以管理员身份运行”。
pause
exit /b 1
)
call WPy64-310111\scripts\env.bat
if errorlevel 1 (
goto ERROR
)
set PIP_EXTRA_INDEX_URL=https://mirrors.cloud.tencent.com/pypi/simple/ http://mirrors.aliyun.com/pypi/simple/
echo =========== 安装与更新 KAA ===========
:INSTALL
echo 检查 pip
python -m pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple --upgrade pip
if errorlevel 1 (
goto ERROR
)
pip config set global.trusted-host "pypi.org files.pythonhosted.org pypi.python.org mirrors.aliyun.com mirrors.cloud.tencent.com mirrors.tuna.tsinghua.edu.cn"
pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
echo 安装 ksaa
pip install --upgrade ksaa
if errorlevel 1 (
goto ERROR
)
echo =========== 当前版本 ===========
pip show ksaa
echo =========== 运行 KAA ===========
:RUN
set no_proxy=localhost, 127.0.0.1, ::1
kaa
if errorlevel 1 (
goto ERROR
)
echo =========== 运行结束 ===========
pause
exit /b 0
:ERROR
echo 发生错误,程序退出
pause
exit /b 1

View File

@ -124,3 +124,11 @@ publish-test: package
#
build-bootstrap:
#!{{shebang_pwsh}}
# 构建 Python
cd bootstrap
python -m zipapp kaa-bootstrap
mv kaa-bootstrap.pyz ../dist/bootstrap.pyz -fo
cd ..
# 构建 C++