This commit is contained in:
ldd 2025-07-07 17:09:33 +08:00 committed by GitHub
commit 8ce90bd8fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1163 additions and 94 deletions

View File

@ -1 +1 @@
{"definitions":{"d271a24f-efe8-424d-8fd5-f6b3756ba4ca":{"name":"InPurodyuusu.TextSkillCard","displayName":"文字「スキルカード」","type":"template","annotationId":"d271a24f-efe8-424d-8fd5-f6b3756ba4ca","useHintRect":false,"description":"用于技能卡选择对话框"}},"annotations":[{"id":"d271a24f-efe8-424d-8fd5-f6b3756ba4ca","type":"rect","data":{"x1":229,"y1":614,"x2":372,"y2":645}}]}
{"definitions":{"d271a24f-efe8-424d-8fd5-f6b3756ba4ca":{"name":"InPurodyuusu.TextSkillCard","displayName":"文字「スキルカード」","type":"template","annotationId":"d271a24f-efe8-424d-8fd5-f6b3756ba4ca","useHintRect":false,"description":"用于技能卡选择对话框"},"cf97ecca-5f15-475e-a9d6-40219ee4a787":{"name":"InPurodyuusu.BoxSelectCardDialogCards","displayName":"选择技能卡对话框 技能卡区域","type":"hint-box","annotationId":"cf97ecca-5f15-475e-a9d6-40219ee4a787","useHintRect":false}},"annotations":[{"id":"d271a24f-efe8-424d-8fd5-f6b3756ba4ca","type":"rect","data":{"x1":229,"y1":614,"x2":372,"y2":645}},{"id":"cf97ecca-5f15-475e-a9d6-40219ee4a787","type":"rect","data":{"x1":55,"y1":797,"x2":665,"y2":984}}]}

View File

@ -1,2 +1,5 @@
from .idol_card import IdolCard
from .constants import CharacterId
from .constants import CharacterId
from .skill_card import SkillCard
__all__ = ['IdolCard', 'CharacterId', 'SkillCard']

View File

@ -1,14 +1,47 @@
from enum import Enum
class CharacterId(Enum):
hski = "hski" # Hanami Saki, 花海咲季
ttmr = "ttmr" # Tsukimura Temari, 月村手毬
fktn = "fktn" # Fujita Kotone, 藤田ことね
amao = "amao" # Arimura Mao, 有村麻央
kllj = "kllj" # Katsuragi Lilja, 葛城リーリヤ
kcna = "kcna" # Kuramoto China, 倉本千奈
ssmk = "ssmk" # Shiun Sumika, 紫云清夏
shro = "shro" # Shinosawa Hiro, 篠澤廣
hrnm = "hrnm" # Himesaki Rinami, 姫崎莉波
hume = "hume" # Hanami Ume, 花海佑芽
jsna = "jsna" # Juo Sena, 十王星南
"""偶像 ID。"""
hski = "hski" # Hanami Saki, 花海咲季
ttmr = "ttmr" # Tsukimura Temari, 月村手毬
fktn = "fktn" # Fujita Kotone, 藤田ことね
amao = "amao" # Arimura Mao, 有村麻央
kllj = "kllj" # Katsuragi Lilja, 葛城リーリヤ
kcna = "kcna" # Kuramoto China, 倉本千奈
ssmk = "ssmk" # Shiun Sumika, 紫云清夏
shro = "shro" # Shinosawa Hiro, 篠澤広
hrnm = "hrnm" # Himesaki Rinami, 姫崎莉波
hume = "hume" # Hanami Ume, 花海佑芽
jsna = "jsna" # Juo Sena, 十王星南
hmsz = "hmsz" # Hataya Misuzu, 秦谷美鈴
class ExamEffectType(Enum):
"""
考试流派
温存根据 ShowExamEffectType 决定
"""
good_condition = "ProduceExamEffectType_ExamParameterBuff"
"""好调"""
focus = "ProduceExamEffectType_ExamLessonBuff"
"""集中"""
good_impression = "ProduceExamEffectType_ExamReview"
"""好印象"""
motivation = "ProduceExamEffectType_ExamCardPlayAggressive"
"""干劲"""
confidence = "ProduceExamEffectType_ExamConcentration"
"""强气"""
full_power = "ProduceExamEffectType_ExamFullPower"
"""全力"""
class ShowExamEffectType(Enum):
"""
若为 ProduceExamEffectType_ExamPreservation 则偶像卡推荐流派显示为 温存
目前分 全力-温存 强气-温存 两种选择卡牌还是根据 全力/强气 来选择
"""
unknown = "ProduceExamEffectType_Unknown"
"""推荐流派与ExamEffectType对应"""
conservation = "ProduceExamEffectType_ExamPreservation"
"""推荐流派显示为温存"""

View File

@ -1,13 +1,16 @@
from dataclasses import dataclass
from .sqlite import select, select_many
from .constants import CharacterId
from .constants import ExamEffectType, ShowExamEffectType
@dataclass
class IdolCard:
"""偶像卡"""
id: str
skin_id: str
exam_effect_type: ExamEffectType
show_exam_effect_type: ShowExamEffectType
is_another: bool
another_name: str | None
name: str
@ -21,6 +24,8 @@ class IdolCard:
SELECT
IC.id AS cardId,
ICS.id AS skinId,
IC.examEffectType AS examEffectType,
IC.showExamEffectType AS showExamEffectType,
Char.lastName || ' ' || Char.firstName || ' ' || IC.name AS name,
NOT (IC.originalIdolCardSkinId = ICS.id) AS isAnotherVer,
ICS.name AS anotherVerName
@ -31,9 +36,16 @@ class IdolCard:
""", sid)
if row is None:
return None
card_id, skin_id, name, is_another, another_name = row
return cls(card_id, skin_id, is_another, another_name, name)
return cls(
id=row["cardId"],
skin_id=row["skinId"],
exam_effect_type=ExamEffectType(row["examEffectType"]),
show_exam_effect_type=ShowExamEffectType(row["showExamEffectType"]),
is_another=bool(row["isAnotherVer"]),
another_name=row["anotherVerName"],
name=row["name"]
)
@classmethod
def all(cls) -> list['IdolCard']:
"""获取所有偶像卡"""
@ -41,6 +53,8 @@ class IdolCard:
SELECT
IC.id AS cardId,
ICS.id AS skinId,
IC.examEffectType AS examEffectType,
IC.showExamEffectType AS showExamEffectType,
Char.lastName || ' ' || Char.firstName || ' ' || IC.name AS name,
NOT (IC.originalIdolCardSkinId = ICS.id) AS isAnotherVer,
ICS.name AS anotherVerName
@ -50,11 +64,20 @@ class IdolCard:
""")
results = []
for row in rows:
card_id, skin_id, name, is_another, another_name = row
results.append(cls(card_id, skin_id, is_another, another_name, name))
results.append(cls(
id=row["cardId"],
skin_id=row["skinId"],
exam_effect_type=ExamEffectType(row["examEffectType"]),
show_exam_effect_type=ShowExamEffectType(row["showExamEffectType"]),
is_another=bool(row["isAnotherVer"]),
another_name=row["anotherVerName"],
name=row["name"]
))
return results
if __name__ == '__main__':
from pprint import pprint as print
print(IdolCard.from_skin_id('i_card-skin-fktn-3-006'))
print(IdolCard.all())
print(IdolCard.all())

View File

@ -0,0 +1,159 @@
from enum import Enum
from typing import Literal
from dataclasses import dataclass
from kotonebot.kaa.db.idol_card import IdolCard
from .sqlite import select, select_many
from .constants import CharacterId
class CardType(Enum):
"""卡牌作用类型"""
MENTAL = "ProduceCardCategory_MentalSkill"
"""M 卡"""
ACTIVE = "ProduceCardCategory_ActiveSkill"
"""A 卡"""
TROUBLE = "ProduceCardCategory_Trouble"
"""T 卡"""
class PlanType(Enum):
"""卡牌职业分类"""
COMMON = "ProducePlanType_Common"
"""通用"""
SENSE = "ProducePlanType_Plan1"
"""感性"""
LOGIC = "ProducePlanType_Plan2"
"""理性"""
ANOMALY = "ProducePlanType_Plan3"
"""非凡"""
class PlayMovePositionType(Enum):
"""卡牌打出后 除外 还是 进入弃牌堆"""
LOST = "ProduceCardMovePositionType_Lost"
"""除外,洗牌后无法抽到"""
GRAVE = "ProduceCardMovePositionType_Grave"
"""进入弃牌堆,洗牌后仍能抽到"""
@dataclass
class SkillCard:
"""技能卡"""
id: str
asset_id: str
"""资源 ID。"""
plan_type: PlanType
"""卡牌职业分类。"""
card_type: CardType
"""卡牌作用类型。"""
name: str
"""卡牌名称。"""
once: bool
"""此卡牌在考试或课程中是否只会出现一次。"""
play_move_position_type: PlayMovePositionType
"""此卡牌在考试或课程中使用后除外还是进入弃牌堆。"""
origin_idol_card: str | None
"""此卡牌所属的偶像卡。"""
origin_support_card: str | None
"""此卡牌所属的支援卡。"""
is_character_asset: bool
"""
此卡牌的资源图片是否会随偶像变化
若为 True `asset_id` 有多个
实际资源 ID `[f'{self.asset_id}-{ii}' for ii in idol_ids]`
"""
@property
def is_from_idol_card(self) -> bool:
"""此卡牌是否来自偶像卡。"""
return self.origin_idol_card is not None
@property
def is_from_support_card(self) -> bool:
"""此卡牌是否来自支援卡。"""
return self.origin_support_card is not None
@property
def asset_ids(self) -> list[str]:
"""
此卡牌的所有资源 ID包括 `is_character_asset` True 的情况
"""
if not self.is_character_asset:
return [self.asset_id]
return [f'{self.asset_id}-{ii.value}' for ii in CharacterId]
@classmethod
def all(cls) -> list['SkillCard']:
"""获取所有技能卡"""
rows = select_many("""
SELECT
id,
assetId,
planType,
category AS cardType,
name,
noDeckDuplication AS once,
playMovePositionType,
originIdolCardId AS idolCardId,
originSupportCardId AS supportCardId,
isCharacterAsset
FROM ProduceCard;
""")
results = []
for row in rows:
results.append(cls(
id=row["id"],
asset_id=row["assetId"],
plan_type=PlanType(row["planType"]),
card_type=CardType(row["cardType"]),
name=row["name"],
once=bool(row["once"]),
play_move_position_type=PlayMovePositionType(row["playMovePositionType"]),
origin_idol_card=row["idolCardId"],
origin_support_card=row["supportCardId"],
is_character_asset=bool(row["isCharacterAsset"])
))
return results
@classmethod
def from_asset_id(cls, asset_id: str) -> 'SkillCard | None':
"""根据资源 ID 查询 SkillCard。"""
for ci in CharacterId:
if asset_id.endswith(ci.value):
asset_id = asset_id[:-len(ci.value) - 1]
break
row = select("""
SELECT
id,
assetId,
planType,
category AS cardType,
name,
noDeckDuplication AS once,
playMovePositionType,
originIdolCardId AS idolCardId,
originSupportCardId AS supportCardId,
isCharacterAsset
FROM ProduceCard
WHERE assetId = ?;
""", asset_id)
if row is None:
return None
return cls(
id=row["id"],
asset_id=row["assetId"],
plan_type=PlanType(row["planType"]),
card_type=CardType(row["cardType"]),
name=row["name"],
once=bool(row["once"]),
play_move_position_type=PlayMovePositionType(row["playMovePositionType"]),
origin_idol_card=row["idolCardId"],
origin_support_card=row["supportCardId"],
is_character_asset=bool(row["isCharacterAsset"])
)
if __name__ == '__main__':
from pprint import pprint as print
print(SkillCard.from_asset_id('img_general_skillcard_men-2_077-hski'))

View File

@ -1,25 +1,49 @@
import os
import sqlite3
from typing import Any, cast
import threading
from logging import getLogger
from typing import Any, cast, Dict, List, Optional
from kotonebot.kaa import resources as res
_db: sqlite3.Connection | None = None
_db_path = cast(str, res.__path__)[0] + '/game.db'
def select_many(query: str, *args) -> list[Any]:
global _db
if not _db:
_db = sqlite3.connect(_db_path)
c = _db.cursor()
_db_dict = {}
logger = getLogger(__name__)
def _dict_factory(cursor, row):
"""将查询结果转换为字典格式"""
return {col[0]: row[idx] for idx, col in enumerate(cursor.description)}
def _ensure_db() -> sqlite3.Connection:
"""
确保数据库连接已建立
培育过程是新开线程不同线程的connection不能使用
# TODO 培育结束需要关闭connection
"""
global _db_dict
thread_id = threading.current_thread().ident
if thread_id not in _db_dict:
_db_dict[thread_id] = sqlite3.connect(_db_path)
_db_dict[thread_id].row_factory = _dict_factory
logger.info("Database connection established for thread: %s", thread_id)
return _db_dict[thread_id]
def select_many(query: str, *args) -> List[Dict[str, Any]]:
"""执行查询并返回多行结果,每行为字典格式"""
db = _ensure_db()
c = db.cursor()
c.execute(query, args)
return c.fetchall()
def select(query: str, *args) -> Any:
global _db
if not _db:
_db = sqlite3.connect(_db_path)
c = _db.cursor()
def select(query: str, *args) -> Optional[Dict[str, Any]]:
"""执行查询并返回单行结果,为字典格式"""
db = _ensure_db()
c = db.cursor()
c.execute(query, args)
return c.fetchone()
return c.fetchone()

View File

@ -18,9 +18,10 @@ logger = logging.getLogger(__name__)
_db: ImageDatabase | None = None
# OpenCV HSV 颜色范围
RED_DOT = ((157, 205, 255), (179, 255, 255)) # 红点
ORANGE_SELECT_BORDER = ((9, 50, 106), (19, 255, 255)) # 当前选中的偶像的橙色边框
WHITE_BACKGROUND = ((0, 0, 234), (179, 40, 255)) # 白色背景
RED_DOT = ((157, 205, 255), (179, 255, 255)) # 红点
ORANGE_SELECT_BORDER = ((9, 50, 106), (19, 255, 255)) # 当前选中的偶像的橙色边框
WHITE_BACKGROUND = ((0, 0, 234), (179, 40, 255)) # 白色背景
def extract_idols(img: MatLike) -> list[RectTuple]:
"""
@ -49,6 +50,7 @@ def extract_idols(img: MatLike) -> list[RectTuple]:
rects.append((x, y, w, h))
return rects
def display_rects(img: MatLike, rects: list[RectTuple]) -> MatLike:
"""Draw rectangles on the image and display them."""
result = img.copy()
@ -57,10 +59,11 @@ def display_rects(img: MatLike, rects: list[RectTuple]) -> MatLike:
# Draw rectangle with green color and 2px thickness
cv2.rectangle(result, (x, y), (x + w, y + h), (0, 255, 0), 2)
# Optionally add text label
cv2.putText(result, f"{w}x{h}", (x, y - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
cv2.putText(result, f"{w}x{h}", (x, y - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
return result
def draw_idol_preview(img: MatLike, rects: list[RectTuple], db: ImageDatabase, idol_path: str) -> MatLike:
"""
在预览图上绘制所有匹配到的偶像
@ -73,31 +76,32 @@ def draw_idol_preview(img: MatLike, rects: list[RectTuple], db: ImageDatabase, i
"""
# 创建一个与原图大小相同的白色背景图片
preview_img = np.ones_like(img) * 255
# 在预览图上绘制所有匹配到的偶像
for rect in rects:
x, y, w, h = rect
idol_img = img[y:y+h, x:x+w]
idol_img = img[y:y + h, x:x + w]
match = db.match(idol_img, 20)
if not match:
continue
file = os.path.join(idol_path, match.key)
found_img = cv2_imread(file)
# 将找到的偶像图片缩放至与检测到的矩形大小相同
resized_found_img = cv2.resize(found_img, (w, h))
# 将缩放后的图片放到预览图上对应位置
preview_img[y:y+h, x:x+w] = resized_found_img
preview_img[y:y + h, x:x + w] = resized_found_img
# 在预览图上绘制矩形框
cv2.rectangle(preview_img, (x, y), (x + w, y + h), (0, 255, 0), 2)
# 可选添加偶像ID标签
cv2.putText(preview_img, match.key.split('.')[0], (x, y - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
cv2.putText(preview_img, match.key.split('.')[0], (x, y - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
return preview_img
def idols_db() -> ImageDatabase:
global _db
if _db is None:
@ -122,6 +126,7 @@ def match_idol(skin_id: str, idol_img: MatLike) -> DatabaseQueryResult | None:
else:
return None
@action('定位偶像', screenshot_mode='manual-inherit')
def locate_idol(skin_id: str) -> Rect | None:
"""
@ -149,7 +154,7 @@ def locate_idol(skin_id: str) -> Rect | None:
img = device.screenshot()
# 只保留 BoxIdolOverviewIdols 区域
mask = np.zeros_like(img)
mask[y:y+h, x:x+w] = img[y:y+h, x:x+w]
mask[y:y + h, x:x + w] = img[y:y + h, x:x + w]
img = mask
# 检测 & 查询
rects = extract_idols(img)
@ -158,7 +163,7 @@ def locate_idol(skin_id: str) -> Rect | None:
# cv2.waitKey(0)
for rect in rects:
rx, ry, rw, rh = rect
idol_img = img[ry:ry+rh, rx:rx+rw]
idol_img = img[ry:ry + rh, rx:rx + rw]
match = db.match(idol_img, 20)
logger.debug('Result rect: %s, match: %s', repr(rect), repr(match))
# Key 格式:{skin_id}_{index}
@ -172,5 +177,23 @@ def locate_idol(skin_id: str) -> Rect | None:
# # 使用新函数绘制预览图
# preview_img = draw_idol_preview(img, rects, db, path)
@action('重新培育页面识别偶像卡', screenshot_mode='manual-inherit')
def find_idol_skin_id_on_resume_produce(img: MatLike) -> str | None:
"""
继续培育 界面查找偶像皮肤id
默认 数据库中的key 偶像皮肤id_\d.png
:return:
"""
db = idols_db()
rx, ry, rw, rh = R.Produce.BoxResumeDialogIdolCard.xywh
idol_img = img[ry:ry + rh, rx:rx + rw]
match = db.match(idol_img, 20)
if match:
return match.key.rsplit("_", 1)[0]
else:
return None
if __name__ == '__main__':
locate_idol('i_card-skin-fktn-3-006')

View File

@ -0,0 +1,82 @@
import logging
from dataclasses import dataclass
import cv2
import numpy as np
from cv2.typing import MatLike
from kotonebot.kaa.tasks import R
from kotonebot.primitives import Rect
from kotonebot.kaa.util import paths
from kotonebot.kaa.image_db import ImageDatabase, HistDescriptor, FileDataSource
from kotonebot.kaa.db import SkillCard
BIN_COUNT = 10
logger = logging.getLogger(__name__)
_db: ImageDatabase | None = None
@dataclass
class SkillCardElement:
rect: Rect
skill_card: SkillCard | None
def _find_cards(img: MatLike) -> list[Rect]:
x, y, w, h = R.InPurodyuusu.BoxSelectCardDialogCards.xywh
# 非目标区域置白
white = np.full_like(img, 255)
white[y:y+h, x:x+w] = img[y:y+h, x:x+w]
img = white
# cv2.imshow('White', cv2.resize(img, (0, 0), fx=0.5, fy=0.5))
# 灰度、高斯模糊、查找边缘、查找轮廓
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = cv2.GaussianBlur(img, (21, 21), 0)
img = cv2.Canny(img, 30, 100)
# cv2.imshow('Canny', cv2.resize(img, (0, 0), fx=0.5, fy=0.5))
contours, _ = cv2.findContours(img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
result = np.zeros_like(img)
# 筛选比例 1:1 的轮廓
results = []
for contour in contours:
rx, ry, rw, rh = cv2.boundingRect(contour)
area = cv2.contourArea(contour)
if rw == 0 or rh == 0:
continue
ratio = rw / rh
# print(rw * rh)
if 0.8 <= ratio <= 1.2 and rw * rh > 6000:
results.append(Rect(rx, ry, rw, rh))
# result = cv2.rectangle(result, (rx, ry), (rx+rw, ry+rh), (0, 255, 0), 2)
# cv2.imshow('Card ' + str(len(results)), white[ry:ry+rh, rx:rx+rw])
# cv2.imshow('Contours', cv2.resize(result, (0, 0), fx=0.5, fy=0.5))
results.sort(key=lambda p: p.x1)
logger.debug(f'{len(results)} cards detected.')
return results
def cards_db() -> ImageDatabase:
global _db
if _db is None:
logger.info('Loading skill cards database...')
path = paths.resource('skill_cards')
db_path = paths.cache('skill_cards.pkl')
_db = ImageDatabase(FileDataSource(str(path)), db_path, HistDescriptor(BIN_COUNT), name='skill_cards')
return _db
def extract_cards(img: MatLike) -> list[SkillCardElement]:
db = cards_db()
results = []
for rect in _find_cards(img):
x, y, w, h = rect.xywh
card_img = img[y:y+h, x:x+w]
match = db.match(card_img, 20)
if match:
logger.debug('Skill card match: %s', match)
asset_id = match.key.split('.')[0]
results.append(SkillCardElement(rect, SkillCard.from_asset_id(asset_id)))
return results
if __name__ == '__main__':
from pprint import pprint
path = r"E:\GithubRepos\KotonesAutoAssistant.worktrees\feat\kotonebot-resource\sprites\jp\in_purodyuusu\screenshot_select_skill_card_2.png"
img = cv2.imread(path)
pprint(extract_cards(img))
cv2.waitKey(0)

View File

@ -97,6 +97,7 @@ class ImageDatabase:
for key, value in self.source:
self.insert(key, value)
self.save()
logger.debug('Data source loaded.')
@property
def db(self) -> Db:
@ -117,11 +118,12 @@ class ImageDatabase:
若为 MatLike必须为 BGR 格式
:param overwrite: 是否覆盖已存在的记录
"""
if not overwrite and key in self.db.data:
return
if isinstance(image, str):
image = cv2_imread(image)
if overwrite or key not in self.db.data:
self.db.insert(key, self.descriptor(image))
logger.debug('Inserted image: %s', key)
self.db.insert(key, self.descriptor(image))
logger.debug('Inserted image: %s', key)
def insert_many(self, images: dict[str, str | MatLike], *, overwrite: bool = False):
"""

View File

View File

@ -0,0 +1,26 @@
from kotonebot.kaa.common import ConfigBaseModel
from kotonebot.kaa.db.constants import ExamEffectType
# TODO: 这个文件应该写在 kotonebot.kaa.common 的 BaseConfig文件中
# TODO用户可以设置自己的流派卡组把卡填入下面的设置中未填入的卡默认不选。
# 单流派卡组设置
class SingleDeckConfig(ConfigBaseModel):
# 角色流派。目前只考虑ExamEffectType温存目前分全力温存和强气温存所以没有温存设置。
archetype: ExamEffectType
# 核心卡,在商店页面会购买
core_cards: list[str] = []
# 高优先度卡
high_priority_cards: list[str] = []
# 中优先度卡
medium_priority_cards: list[str] = []
# 低优先度卡
low_priority_cards: list[str] = []
# 所有卡组设置
class DeckConfig(ConfigBaseModel):
# 预设卡组
pre_built_deck: list[SingleDeckConfig]
# 自定义卡组
custom_deck: list[SingleDeckConfig]

View File

@ -0,0 +1,10 @@
from enum import IntEnum
# 卡牌抉择优先度,越大优先度越低
class CardPriority(IntEnum):
core = 0
high = 1
medium = 2
low = 3
other = 99

View File

@ -0,0 +1,61 @@
from dataclasses import dataclass
from logging import getLogger
from kotonebot.kaa.db.constants import ExamEffectType
from kotonebot.kaa.skill_card.card_deck_config import DeckConfig, SingleDeckConfig
from kotonebot.kaa.skill_card.enum_constant import CardPriority
logger = getLogger(__name__)
@dataclass
class GlobalIdolSetting:
def __init__(self):
# 是否需要刷新全局配置,理论上新开培育、重新培育都需要更新
self.need_update: bool = True
self.idol_archetype: ExamEffectType = ExamEffectType.good_impression
self.card_deck: dict = {}
def new_play(self):
self.need_update = True
self.card_deck.clear()
logger.info("New game, wait for update")
def update_deck(self, idol_archetype: ExamEffectType, config: DeckConfig):
"""
根据流派选择初始化对应的卡组配置如果自定义有就使用自定义没有就使用预设
:param idol_archetype: 偶像流派
:param config: 卡组配置
:return:
"""
if not self.need_update:
return
self.idol_archetype = idol_archetype
self.need_update = False
self.card_deck.clear()
for single_deck_config in config.custom_deck:
if single_deck_config.archetype == idol_archetype:
self.refresh_card_deck(single_deck_config)
logger.info("Use custom card deck,idol archetype:%s", idol_archetype)
return
for single_deck_config in config.pre_built_deck:
if single_deck_config.archetype == idol_archetype:
self.refresh_card_deck(single_deck_config)
logger.info("Use pre built card deck,idol archetype:%s", idol_archetype)
return
logger.warning("No deck config for idol archetype: %s", idol_archetype)
self.need_update = True
def refresh_card_deck(self, card_deck_config: SingleDeckConfig):
self.card_deck.update({card: CardPriority.low for card in card_deck_config.low_priority_cards})
self.card_deck.update({card: CardPriority.medium for card in card_deck_config.medium_priority_cards})
self.card_deck.update({card: CardPriority.high for card in card_deck_config.high_priority_cards})
self.card_deck.update({card: CardPriority.core for card in card_deck_config.core_cards})
def get_card_priority(self, card_id: str) -> CardPriority:
"""
根据卡名来查看此卡的选卡优先级
:param card_id: 卡id
:return: 优先级越小越高不在配置卡组则返回other(99)
"""
return self.card_deck.get(card_id, CardPriority.other)

View File

@ -0,0 +1,41 @@
from logging import getLogger
from dataclasses import dataclass
from kotonebot.kaa.db.skill_card import PlayMovePositionType
from kotonebot.kaa.game_ui.skill_card_select import SkillCardElement
from kotonebot.kaa.skill_card.enum_constant import CardPriority
from kotonebot.kaa.skill_card_action.global_idol_setting_action import idol_setting
logger = getLogger(__name__)
@dataclass
class ActualCard:
skill_card_element: SkillCardElement
priority: CardPriority
@staticmethod
def create_by(card: SkillCardElement) -> 'ActualCard':
"""
:param card: 读取到的技能卡信息
:return: cls
"""
priority = idol_setting.get_card_priority(card.skill_card.id)
return ActualCard(card, priority)
def __lt__(self, other):
return self.priority < other.priority
def select(self) -> bool:
"""
选卡时可以选择
:return:
"""
return self.priority != CardPriority.other
def lost(self):
"""
是否为 除外
:return:
"""
return self.skill_card_element.skill_card.play_move_position_type == PlayMovePositionType.LOST

View File

@ -0,0 +1,249 @@
{
"pre_built_deck": [
{
"archetype": "ProduceExamEffectType_ExamParameterBuff",
"core_cards": [
"p_card-01-act-3_049",
"p_card-01-men-3_006",
"p_card-01-men-3_036"
],
"high_priority_cards": [
"p_card-00-men-2_012",
"p_card-00-men-3_003",
"p_card-00-men-3_005",
"p_card-01-men-1_034",
"p_card-01-men-2_038",
"p_card-01-men-2_035",
"p_card-01-men-2_036",
"p_card-01-men-2_037",
"p_card-01-act-2_001",
"p_card-01-act-3_030",
"p_card-01-act-3_029",
"p_card-01-men-3_033",
"p_card-01-men-3_035",
"p_card-01-act-3_010"
],
"medium_priority_cards": [
"p_card-01-men-2_039",
"p_card-01-men-2_043",
"p_card-01-men-2_034",
"p_card-01-men-2_040",
"p_card-01-act-1_020",
"p_card-01-act-1_021",
"p_card-01-act-1_036",
"p_card-01-act-2_042"
],
"low_priority_cards": [
"p_card-01-act-2_003",
"p_card-01-act-2_033",
"p_card-01-act-2_059",
"p_card-01-men-2_041",
"p_card-00-men-1_007",
"p_card-00-men-3_012",
"p_card-00-men-3_011"
]
},
{
"archetype": "ProduceExamEffectType_ExamLessonBuff",
"core_cards": [
"p_card-01-act-3_049",
"p_card-01-men-3_006",
"p_card-01-men-3_036"
],
"high_priority_cards": [
"p_card-00-men-2_012",
"p_card-00-men-3_003",
"p_card-00-men-3_005",
"p_card-01-men-1_034",
"p_card-01-men-2_038",
"p_card-01-men-2_035",
"p_card-01-men-2_036",
"p_card-01-men-2_037",
"p_card-01-act-2_001",
"p_card-01-act-3_030",
"p_card-01-act-3_029",
"p_card-01-men-3_033",
"p_card-01-men-3_035",
"p_card-01-act-3_010"
],
"medium_priority_cards": [
"p_card-01-men-2_039",
"p_card-01-men-2_043",
"p_card-01-men-2_034",
"p_card-01-men-2_040",
"p_card-01-act-1_020",
"p_card-01-act-1_021",
"p_card-01-act-1_036",
"p_card-01-act-2_042"
],
"low_priority_cards": [
"p_card-01-act-2_003",
"p_card-01-act-2_033",
"p_card-01-act-2_059",
"p_card-01-men-2_041",
"p_card-00-men-1_007",
"p_card-00-men-3_012",
"p_card-00-men-3_011"
]
},
{
"archetype": "ProduceExamEffectType_ExamReview",
"core_cards": [
"p_card-02-act-3_050",
"p_card-02-men-3_002",
"p_card-02-men-3_004",
"p_card-02-men-3_040"
],
"high_priority_cards": [
"p_card-00-men-2_012",
"p_card-00-men-3_003",
"p_card-00-men-3_005",
"p_card-02-men-2_056",
"p_card-02-act-2_048",
"p_card-02-men-2_058",
"p_card-02-men-2_004",
"p_card-02-men-3_041",
"p_card-02-men-3_043",
"p_card-02-men-3_042",
"p_card-02-act-2_049",
"p_card-02-men-2_057",
"p_card-02-men-2_060"
],
"medium_priority_cards": [
"p_card-02-act-1_003",
"p_card-02-men-1_035",
"p_card-02-men-2_051",
"p_card-02-men-2_054",
"p_card-02-act-3_001",
"p_card-02-act-3_052"
],
"low_priority_cards": [
"p_card-02-act-2_046",
"p_card-02-men-2_050",
"p_card-02-men-2_052",
"p_card-02-men-2_008",
"p_card-00-men-1_007",
"p_card-00-men-3_012",
"p_card-00-men-3_011"
]
},
{
"archetype": "ProduceExamEffectType_ExamCardPlayAggressive",
"core_cards": [
"p_card-02-men-3_002",
"p_card-02-men-3_040",
"p_card-02-act-1_037",
"p_card-02-act-2_062"
],
"high_priority_cards": [
"p_card-00-men-2_012",
"p_card-00-men-3_003",
"p_card-00-men-3_005",
"p_card-02-men-2_056",
"p_card-02-men-2_004",
"p_card-02-men-2_052",
"p_card-02-men-2_008",
"p_card-02-act-3_039",
"p_card-02-men-3_041",
"p_card-02-men-3_043"
],
"medium_priority_cards": [
"p_card-02-act-3_038",
"p_card-02-men-3_044",
"p_card-02-act-3_045",
"p_card-02-act-2_047",
"p_card-02-men-2_054"
],
"low_priority_cards": [
"p_card-02-act-1_004",
"p_card-02-men-1_006",
"p_card-02-men-2_051",
"p_card-00-men-1_007",
"p_card-00-men-3_012",
"p_card-00-men-3_011"
]
},
{
"archetype": "ProduceExamEffectType_ExamConcentration",
"core_cards": [
"p_card-03-men-3_058",
"p_card-03-men-3_061"
],
"high_priority_cards": [
"p_card-00-men-2_012",
"p_card-00-men-3_003",
"p_card-00-men-3_005",
"p_card-03-act-2_081",
"p_card-03-men-2_066",
"p_card-03-men-2_076",
"p_card-03-men-2_080",
"p_card-03-men-2_074",
"p_card-03-act-3_054",
"p_card-03-act-3_056",
"p_card-03-act-3_055",
"p_card-03-act-3_065",
"p_card-03-men-3_062"
],
"medium_priority_cards": [
"p_card-03-act-1_044",
"p_card-03-men-1_039",
"p_card-03-men-1_049",
"p_card-03-act-2_063",
"p_card-03-act-2_073",
"p_card-03-men-2_077"
],
"low_priority_cards": [
"p_card-03-men-1_048",
"p_card-03-act-2_068",
"p_card-03-act-2_067",
"p_card-00-men-1_007",
"p_card-00-men-3_012",
"p_card-00-men-3_011"
]
},
{
"archetype": "ProduceExamEffectType_ExamFullPower",
"core_cards": [
"p_card-03-men-2_075",
"p_card-03-men-3_058"
],
"high_priority_cards": [
"p_card-00-men-2_012",
"p_card-00-men-3_003",
"p_card-00-men-3_005",
"p_card-03-men-2_066",
"p_card-03-men-2_076",
"p_card-03-men-2_080",
"p_card-03-men-2_074",
"p_card-03-act-3_057",
"p_card-03-act-3_056",
"p_card-03-act-3_053",
"p_card-03-act-3_065",
"p_card-03-men-3_062",
"p_card-03-men-3_063"
],
"medium_priority_cards": [
"p_card-03-men-1_049",
"p_card-03-men-1_048",
"p_card-03-men-1_051",
"p_card-03-act-2_063",
"p_card-03-act-2_071",
"p_card-03-men-2_064",
"p_card-03-men-2_077",
"p_card-03-act-3_055"
],
"low_priority_cards": [
"p_card-03-act-1_038",
"p_card-03-act-1_042",
"p_card-03-act-2_072",
"p_card-03-act-2_068",
"p_card-03-act-2_082",
"p_card-00-men-1_007",
"p_card-00-men-3_012",
"p_card-00-men-3_011"
]
}
],
"custom_deck": [
]
}

View File

@ -0,0 +1,39 @@
import json
import os
from logging import getLogger
from kotonebot.kaa.db import IdolCard
from kotonebot.kaa.skill_card.card_deck_config import DeckConfig
from kotonebot.kaa.skill_card.global_idol_setting import GlobalIdolSetting
logger = getLogger(__name__)
# TODO: 获取默认配置
def get_default_config() -> DeckConfig:
path = os.path.join(os.path.dirname(__file__), 'default_card_deck_config.json')
if not os.path.exists(path):
raise FileNotFoundError(path)
with open(path, 'r', encoding='utf-8') as f:
root = json.load(f)
return DeckConfig.model_validate(root)
# TODO: 这里应该在游戏开始就初始化在新培育、继续培育时调用new_play()待更新在读取偶像信息后调用update_deck()完成更新
idol_setting = GlobalIdolSetting()
# TODO: 这里应该从配置文件中读取
deck_config = get_default_config()
def update_archetype_by_idol_skin_id(idol_skin_id: str):
"""
根据偶像皮肤id更新全局偶像信息
:param idol_skin_id:
:return:
"""
idol_setting.new_play()
idol_card = IdolCard.from_skin_id(idol_skin_id)
if idol_card:
idol_setting.update_deck(idol_card.exam_effect_type, deck_config)
else:
logger.warning(f"Can`t found archetype by skin id: {idol_skin_id}")

View File

@ -0,0 +1,139 @@
from logging import getLogger
from kotonebot import device, image, Interval, ocr, contains, Countdown, action
from kotonebot.backend.core import HintBox
from kotonebot.kaa.game_ui import dialog, badge
from kotonebot.kaa.game_ui.skill_card_select import extract_cards
from kotonebot.kaa.skill_card_action.card_reader import ActualCard
from kotonebot.kaa.tasks import R
from kotonebot.primitives import Rect
logger = getLogger(__name__)
# 除外技能卡按钮位置
RemoveSkillButton = HintBox(x1=90, y1=1076, x2=200, y2=1115, source_resolution=(720, 1280))
# 除外技能卡确认按钮位置
RemoveConfirmButton = HintBox(x1=440, y1=1125, x2=600, y2=1190, source_resolution=(720, 1280))
# 再抽選按钮位置
RefreshSkillButton = HintBox(x1=570, y1=1076, x2=655, y2=1115, source_resolution=(720, 1280))
@action('获取技能卡', screenshot_mode='manual-inherit')
def select_skill_card():
"""获取技能卡(スキルカード)"""
logger.debug("Locating all skill cards...")
it = Interval(0.5)
cd = Countdown(sec=60).start()
while not cd.expired():
device.screenshot()
it.wait()
# 是否显示技能卡选择指导的对话框
# [kotonebot-resource/sprites/jp/in_purodyuusu/screenshot_show_skill_card_select_guide_dialog.png]
if image.find(R.InPurodyuusu.TextSkillCardSelectGuideDialogTitle):
# 默认就是显示,直接确认
dialog.yes()
continue
img = device.screenshot()
skill_card_elements = extract_cards(img)
if skill_card_elements:
cards = [ActualCard.create_by(skill_card_element) for skill_card_element in skill_card_elements]
cards = sorted(cards)
target_card = cards[0]
select_suggest = False
# 非卡组配置的卡时,尝试刷新
if not target_card.select():
if try_refresh_skill_card(target_card.skill_card_element.rect):
it.wait()
continue
# 如果没刷新次数,尝试选取除外卡,没有除外卡才选择推荐卡
if once_cards := [card for card in cards if card.lost()]:
target_card = once_cards[0]
else:
select_suggest = True
if select_suggest:
# 既没有刷新,也没有除外卡,选择推荐卡
card_rect = find_recommended_card_rect([card.skill_card_element.rect for card in cards])
logger.info(f"Select recommended card")
device.click(card_rect)
else:
logger.info(f"Select {target_card.skill_card_element.skill_card.name}")
device.click(target_card.skill_card_element.rect)
it.wait()
else:
logger.info("No skill card found, retrying...")
continue
device.screenshot()
it.wait()
if acquire_btn := image.find(R.InPurodyuusu.AcquireBtnDisabled):
logger.debug("Click acquire button")
device.click(acquire_btn)
return
logger.warning("Skill card select failed")
@action('寻找推荐卡', screenshot_mode='manual-inherit')
def find_recommended_card_rect(cards: list[Rect]) -> Rect:
# 判断是否有推荐卡
rec_badges = image.find_all(R.InPurodyuusu.TextRecommend)
rec_badges = [card.rect for card in rec_badges]
if rec_badges:
matches = badge.match(cards, rec_badges, 'mb')
logger.debug("Recommend card badge matches: %s", matches)
# 选第一个推荐卡
target_match = next(filter(lambda m: m.badge is not None, matches), None)
if target_match:
target_card = target_match.object
else:
target_card = cards[0]
else:
logger.debug("No recommend badge found. Pick first card.")
target_card = cards[0]
return target_card
@action('刷新技能卡', screenshot_mode='manual-inherit')
def try_refresh_skill_card(first_card: Rect) -> bool:
"""
尝试刷新
:param first_card: 选择除去的卡的位置
:return: True为成功刷新
"""
device.screenshot()
it = Interval(0.5)
if ocr.find(contains("除去"), rect=RemoveSkillButton):
device.click(first_card)
device.screenshot()
it.wait()
device.click(RemoveSkillButton)
cd = Countdown(sec=5).start()
# 等待除去页面
while not ocr.find(contains("除去"), rect=RemoveConfirmButton):
if cd.expired():
break
device.screenshot()
it.wait()
device.click(RemoveConfirmButton)
cd.reset()
# 回到领取技能卡页面
while not image.find(R.InPurodyuusu.TextClaim):
if cd.expired():
break
device.click(10, 10)
device.screenshot()
it.wait()
logger.info("Remove success")
return True
if ocr.find(contains("再抽選"), rect=RefreshSkillButton):
device.click(RefreshSkillButton)
logger.info("Refresh success")
it.wait()
return True
logger.info("No Refresh")
it.wait()
return False

View File

@ -9,6 +9,7 @@ from kotonebot import (
sleep,
Interval,
)
from kotonebot.kaa.skill_card_action.select_skill_card import select_skill_card
from kotonebot.primitives import Rect
from kotonebot.kaa.tasks import R
from .p_drink import acquire_p_drink
@ -26,16 +27,16 @@ def acquire_skill_card():
# TODO: 识别卡片内容,而不是固定选卡
# TODO: 不硬编码坐标
logger.debug("Locating all skill cards...")
it = Interval()
cards = None
card_clicked = False
target_card = None
while True:
device.screenshot()
it.wait()
# 是否显示技能卡选择指导的对话框
# [kotonebot-resource/sprites/jp/in_purodyuusu/screenshot_show_skill_card_select_guide_dialog.png]
if image.find(R.InPurodyuusu.TextSkillCardSelectGuideDialogTitle):
@ -259,7 +260,7 @@ def fast_acquisitions() -> AcquisitionType | None:
logger.debug("Check skill card select...")
if image.find(R.InPurodyuusu.TextSkillCard):
logger.info("Acquire skill card found")
acquire_skill_card()
select_skill_card()
return "PSkillCardSelect"
# P物品选择
logger.debug("Check PItem select...")

View File

@ -3,6 +3,7 @@ from itertools import cycle
from typing import Optional, Literal
from typing_extensions import assert_never
from kotonebot.kaa.skill_card_action.global_idol_setting_action import update_archetype_by_idol_skin_id
from kotonebot.ui import user
from kotonebot.kaa.tasks import R
from kotonebot.kaa.common import conf
@ -11,7 +12,7 @@ from ..actions.scenes import at_home, goto_home
from kotonebot.backend.loop import Loop, StatedLoop
from kotonebot.util import Countdown, Interval, Throttler
from kotonebot.kaa.game_ui.primary_button import find_button
from kotonebot.kaa.game_ui.idols_overview import locate_idol, match_idol
from kotonebot.kaa.game_ui.idols_overview import locate_idol, match_idol, find_idol_skin_id_on_resume_produce
from ..produce.in_purodyuusu import hajime_pro, hajime_regular, hajime_master, resume_pro_produce, resume_regular_produce, \
resume_master_produce
from kotonebot import device, image, ocr, task, action, sleep, contains, regex
@ -166,6 +167,12 @@ def resume_produce():
raise ValueError('Failed to detect weeks after multiple retries.')
if current_week is None:
raise ValueError('Failed to detect current_week.')
# 更新全局偶像信息
img = device.screenshot()
skin_id = find_idol_skin_id_on_resume_produce(img)
update_archetype_by_idol_skin_id(skin_id)
# 点击 再開する
# [kotonebot-resource/sprites/jp/produce/produce_resume.png]
logger.info('Click resume button.')
@ -218,6 +225,8 @@ def do_produce(
device.click(R.Produce.BoxProduceOngoing)
sleep(2)
# 新培育时更新偶像信息
update_archetype_by_idol_skin_id(idol_skin_id)
# 0. 进入培育页面
logger.info(f'Enter produce page. Mode: {mode}')
match mode:

View File

@ -4,24 +4,87 @@
import os
import sys
import tqdm
import shutil
import sqlite3
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, List, Tuple
import cv2
import requests
import urllib3
from kotonebot.kaa.db.constants import CharacterId
sys.path.append(os.path.abspath('./submodules/GkmasObjectManager'))
import GkmasObjectManager as gom # type: ignore
print("拉取资源...")
print('拉取清单文件...')
manifest = gom.fetch()
print("提取 P 偶像卡资源...")
base_path = './kotonebot/kaa/resources/idol_cards'
os.makedirs(base_path, exist_ok=True)
# 定义下载任务类型:(资源ID, 下载路径, 下载完成后调用的函数)
DownloadTask = Tuple[str, str, Callable[[str], None] | None]
download_tasks: List[DownloadTask] = []
MAX_RETRY_COUNT = 5
MAX_WORKERS = 4 # 最大并发下载数
def download_to(asset_id: str, path: str, overwrite: bool = False):
"""单个文件下载函数"""
retry_count = 1
while True:
try:
if not overwrite and os.path.exists(path):
print(f'Skipped {asset_id}.')
return
manifest.download(asset_id, path=path, categorize=False)
break
except requests.exceptions.ReadTimeout | requests.exceptions.SSLError | requests.exceptions.ConnectionError | urllib3.exceptions.MaxRetryError as e:
retry_count += 1
if retry_count >= MAX_RETRY_COUNT:
raise e
print(f'Network error: {e}')
print('Retrying...')
def run(tasks: List[DownloadTask], description: str = "下载中") -> None:
"""并行执行下载任务列表"""
def _download(task: DownloadTask) -> None:
asset_id, path, post_process_func = task
try:
download_to(asset_id, path)
if post_process_func is not None:
post_process_func(path)
except Exception as e:
print(f'Failed to download {asset_id}: {e}')
raise
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_task = {executor.submit(_download, task): task for task in tasks}
with tqdm.tqdm(total=len(tasks), desc=description) as pbar:
for future in as_completed(future_to_task):
future.result()
pbar.update(1)
# 创建目录
print("创建资源目录...")
IDOL_CARD_PATH = './kotonebot/kaa/resources/idol_cards'
SKILL_CARD_PATH = './kotonebot/kaa/resources/skill_cards'
os.makedirs(IDOL_CARD_PATH, exist_ok=True)
os.makedirs(SKILL_CARD_PATH, exist_ok=True)
db = sqlite3.connect("./kotonebot/kaa/resources/game.db")
def resize_idol_card_image(path: str) -> None:
"""偶像卡图片后处理:调整分辨率为 140x188"""
if os.path.exists(path):
img = cv2.imread(path)
if img is not None:
img = cv2.resize(img, (140, 188), interpolation=cv2.INTER_AREA)
cv2.imwrite(path, img)
################################################
# 1. 构建 P 偶像卡下载任务
print("添加 P 偶像卡任务...")
cursor = db.execute("""
SELECT
IC.id AS cardId,
@ -36,40 +99,122 @@ JOIN Character Char ON characterId = Char.id
JOIN IdolCardSkin ICS ON IC.id = ICS.idolCardId;
""")
print("下载 P 偶像卡资源...")
for row in tqdm.tqdm(cursor.fetchall()):
for row in tqdm.tqdm(cursor.fetchall(), desc="构建偶像卡任务"):
_, skin_id, name, asset_id, _, _ = row
# 下载资源
# 低特训等级
asset_id0 = f'img_general_{asset_id}_0-thumb-portrait'
path0 = base_path + f'/{skin_id}_0.png'
# 高特训等级
asset_id1 = f'img_general_{asset_id}_1-thumb-portrait'
path1 = base_path + f'/{skin_id}_1.png'
if asset_id is None:
raise ValueError(f"未找到P偶像卡资源{skin_id} {name}")
while True:
# 低特训等级
asset_id0 = f'img_general_{asset_id}_0-thumb-portrait'
path0 = IDOL_CARD_PATH + f'/{skin_id}_0.png'
download_tasks.append((asset_id0, path0, resize_idol_card_image))
# 高特训等级
asset_id1 = f'img_general_{asset_id}_1-thumb-portrait'
path1 = IDOL_CARD_PATH + f'/{skin_id}_1.png'
download_tasks.append((asset_id1, path1, resize_idol_card_image))
# 2. 构建技能卡下载任务
print("添加技能卡任务...")
cursor = db.execute("""
SELECT
DISTINCT assetId,
isCharacterAsset
FROM ProduceCard;
""")
for row in tqdm.tqdm(cursor.fetchall(), desc="构建技能卡任务"):
asset_id, is_character_asset = row
assert asset_id is not None
if not is_character_asset:
path = SKILL_CARD_PATH + f'/{asset_id}.png'
download_tasks.append((asset_id, path, None))
else:
for ii in CharacterId:
actual_asset_id = f'{asset_id}-{ii.value}'
path = SKILL_CARD_PATH + f'/{actual_asset_id}.png'
download_tasks.append((actual_asset_id, path, None))
print(f'开始下载 {len(download_tasks)} 个资源,并发数 {MAX_WORKERS}...')
run(download_tasks)
################################################
# 检查下载结果并重试失败的文件
################################################
def check_downloaded_files(tasks: List[DownloadTask]) -> List[DownloadTask]:
"""检查所有下载的文件,返回需要重试的任务列表"""
failed_tasks = []
print("检查下载的文件...")
for task in tqdm.tqdm(tasks, desc="检查文件"):
_, path, _ = task
# 检查文件是否存在
if not os.path.exists(path):
print(f"文件不存在: {path}")
failed_tasks.append(task)
continue
# 使用 OpenCV 读取图片检查是否为空
try:
if not os.path.exists(path0):
manifest.download(asset_id0, path=path0, categorize=False)
# 转换分辨率 140x188
img0 = cv2.imread(path0)
assert img0 is not None
img0 = cv2.resize(img0, (140, 188), interpolation=cv2.INTER_AREA)
cv2.imwrite(path0, img0)
else:
print(f'Skipped {path0}')
if not os.path.exists(path1):
manifest.download(asset_id1, path=path1, categorize=False)
# 转换分辨率 140x188
img1 = cv2.imread(path1)
assert img1 is not None
img1 = cv2.resize(img1, (140, 188), interpolation=cv2.INTER_AREA)
cv2.imwrite(path1, img1)
else:
print(f'Skipped {path1}')
break
except requests.exceptions.ReadTimeout as e:
print(f'Network error: {e}')
print('Retrying...')
img = cv2.imread(path)
if img is None:
print(f"OpenCV 无法读取文件: {path}")
failed_tasks.append(task)
continue
# 检查图片尺寸是否合理
if img.shape[0] == 0 or img.shape[1] == 0:
print(f"图片尺寸异常: {path}, 尺寸: {img.shape}")
failed_tasks.append(task)
continue
except Exception as e:
print(f"检查文件时出错: {path}, 错误: {e}")
failed_tasks.append(task)
continue
return failed_tasks
# 执行检查和重试
max_retry_rounds = 3
retry_round = 0
failed_tasks = []
while retry_round < max_retry_rounds:
failed_tasks = check_downloaded_files(download_tasks)
if not failed_tasks:
print("所有文件验证成功!")
break
print(f"发现 {len(failed_tasks)} 个失败的文件,开始第 {retry_round + 1} 轮重试...")
# 删除失败的文件,准备重新下载
for task in failed_tasks:
_, path, _ = task
if os.path.exists(path):
try:
os.remove(path)
print(f"删除损坏文件: {path}")
except Exception as e:
print(f"删除文件失败: {path}, 错误: {e}")
# 重新下载失败的文件
try:
run(failed_tasks, f"重试下载 (第 {retry_round + 1} 轮)")
retry_round += 1
except Exception as e:
print(f"重试下载时出错: {e}")
break
if failed_tasks:
print(f"警告:仍有 {len(failed_tasks)} 个文件下载失败:")
for task in failed_tasks:
asset_id, path, _ = task
print(f" - {asset_id} -> {path}")
db.close()