chore: 完成启动脚本

This commit is contained in:
XcantloadX 2025-01-24 19:14:32 +08:00
parent 44f93fb1e4
commit e2dc2d9055
6 changed files with 204 additions and 5 deletions

View File

@ -1,4 +1,4 @@
# KotonesAutoAssistant 琴音小助手
# Kotone's Auto Assistant 琴音小助手
## 功能
* 自动日常,包括
* 领取礼物(邮件)

40
bootstrap/启动.bat Normal file
View File

@ -0,0 +1,40 @@
@echo off
call WPy64-310111\scripts\env.bat
if errorlevel 1 (
goto ERROR
)
set PIP_EXTRA_INDEX_URL=https://mirrors.cloud.tencent.com/pypi/simple/ http://mirrors.aliyun.com/pypi/simple/
echo =========== 安装与更新 KAA ===========
:INSTALL
echo 检查 pip
python -m pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple --upgrade pip
if errorlevel 1 (
goto ERROR
)
pip config set global.trusted-host "pypi.org files.pythonhosted.org pypi.python.org mirrors.aliyun.com mirrors.cloud.tencent.com mirrors.tuna.tsinghua.edu.cn"
pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
echo 安装 ksaa
pip install --upgrade ksaa
if errorlevel 1 (
goto ERROR
)
echo =========== 当前版本 ===========
pip show ksaa
echo =========== 运行 KAA ===========
:RUN
ksaa
if errorlevel 1 (
goto ERROR
)
echo =========== 运行结束 ===========
pause
exit /b 0
:ERROR
echo 发生错误,程序退出
pause
exit /b 1

View File

@ -0,0 +1,32 @@
@echo off
call WPy64-310111\scripts\env.bat
if errorlevel 1 (
goto ERROR
)
if "%~1"=="" (
echo 请将 whl 文件拖到此脚本上
pause
exit /b 1
)
echo 卸载原有包...
pip uninstall -y ksaa
if errorlevel 1 (
goto ERROR
)
echo 正在安装 %~1 ...
pip install "%~1"
if errorlevel 1 (
goto ERROR
)
echo 安装完成
pause
exit /b 0
:ERROR
echo 发生错误,程序退出
pause
exit /b 1

View File

@ -37,6 +37,7 @@ env:
} else {
./.venv/bin/pip install -r requirements.dev.txt
}
{{venv}} python tools/make_resources.py
# Build the project using pyinstaller
build: env
@ -48,7 +49,8 @@ generate-pyproject-toml version:
from datetime import datetime
today = datetime.now()
version = today.strftime("%Y.%m.%d")
# version = today.strftime("%Y.%m.%d")
version = "{{version}}"
with open("pyproject.template.toml", "r", encoding="utf-8") as f:
template = f.read()
@ -62,7 +64,7 @@ generate-pyproject-toml version:
if (Test-Path dist) { rm -r -fo dist }
if (Test-Path build) { rm -r -fo build }
Write-Host "Generating pyproject.toml..."
just generate-pyproject-toml version
just generate-pyproject-toml {{version}}
Write-Host "Packaging KAA {{version}}..."
@{{venv}} python -m build
Write-Host "Removing pyproject.toml..."
@ -78,3 +80,6 @@ publish version: (package version)
publish-test version: (package version)
@Write-Host "Uploading to PyPI-Test..."
twine upload --repository testpypi dist/* -u __token__ -p $env:PYPI_TEST_TOKEN
#
build-bootstrap:

View File

@ -3,7 +3,7 @@ requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "kaa"
name = "ksaa"
version = "<<<version>>>"
description = "Kotones Auto Assistant"
readme = "README.md"
@ -37,4 +37,4 @@ package-dir = { "kotonebot" = "kotonebot", "kotonebot.res" = "res" }
include-package-data = true
[project.scripts]
kaa = "kotonebot.ui.gr:main"
ksaa = "kotonebot.ui.gr:main"

122
tools/tcp_forward.py Normal file
View File

@ -0,0 +1,122 @@
#!/usr/bin/env python3
import argparse
import socket
import select
import sys
import re
from typing import Tuple, Optional
import time
def parse_address(addr_str: str) -> Tuple[str, int]:
"""Parse address string in format addr:port or port"""
if ':' in addr_str:
host, port = addr_str.split(':')
if '/' in port:
port = port.split('/')[0]
return host, int(port)
else:
if '/' in addr_str:
addr_str = addr_str.split('/')[0]
return 'localhost', int(addr_str)
def create_server(addr: str, port: int) -> socket.socket:
"""Create and bind server socket"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((addr, port))
server.listen(1)
return server
def format_addr(addr: Tuple[str, int]) -> str:
"""Format address tuple to string"""
return f"{addr[0]}:{addr[1]}"
def forward_data(source: socket.socket, destination: socket.socket, source_addr: str, dest_addr: str, verbose: bool) -> bool:
"""Forward data between sockets, return False if connection is closed"""
try:
data = source.recv(4096)
if not data:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Connection closed by {source_addr}")
return False
destination.sendall(data)
if verbose:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Forwarded {len(data)} bytes: {source_addr} -> {dest_addr}")
return True
except Exception as e:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Error forwarding data from {source_addr}: {e}")
return False
def main():
parser = argparse.ArgumentParser(
description='TCP Port Forwarding Tool'
)
parser.add_argument(
'-f', '--from', dest='source', required=True,
help='Forward to address and port (format: addr:port/port or port)'
)
parser.add_argument(
'-t', '--to', dest='target', required=True,
help='Listen on address and port (format: addr:port/port or port)'
)
parser.add_argument(
'-v', '--verbose', action='store_true',
help='Enable verbose output including data transfer information'
)
args = parser.parse_args()
# Parse source and target addresses
forward_addr, forward_port = parse_address(args.source) # Forward to this address
listen_addr, listen_port = parse_address(args.target) # Listen on this address
try:
# Create listening server
server = create_server(listen_addr, listen_port)
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Started TCP forwarding")
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Listening on {listen_addr}:{listen_port}")
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Forwarding to {forward_addr}:{forward_port}")
while True:
# Wait for client connection
client_sock, client_addr = server.accept()
client_str = format_addr(client_addr)
forward_str = f"{forward_addr}:{forward_port}"
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] New connection from {client_str}")
try:
# Connect to forward server
forward_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
forward_sock.connect((forward_addr, forward_port))
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Connected to target {forward_str}")
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Connection established: {client_str} <-> {forward_str}")
# Use select for bidirectional forwarding
while True:
readable, _, _ = select.select(
[client_sock, forward_sock], [], [], 1)
if client_sock in readable:
if not forward_data(client_sock, forward_sock, client_str, forward_str, args.verbose):
break
if forward_sock in readable:
if not forward_data(forward_sock, client_sock, forward_str, client_str, args.verbose):
break
except Exception as e:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Forwarding error: {e}")
finally:
client_sock.close()
forward_sock.close()
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Connection closed: {client_str} <-> {forward_str}")
except KeyboardInterrupt:
print(f"\n[{time.strftime('%Y-%m-%d %H:%M:%S')}] Shutting down...")
except Exception as e:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Error: {e}")
finally:
server.close()
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Server stopped")
if __name__ == '__main__':
main()