ADD file via upload

This commit is contained in:
qiwang 2025-07-17 08:57:29 +08:00
parent aa7a52fc0b
commit fa1ed9757d
1 changed files with 544 additions and 0 deletions

View File

@ -0,0 +1,544 @@
import concurrent.futures
import time
import logging
import threading
from uuid import uuid4
from typing import Dict, List, Optional
# -------------------------- 全局配置与常量定义 --------------------------
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# 任务状态定义
TASK_STATUS = {
"SUBMITTED": "待提交", # 初始状态
"SUBMITTING": "提交中", # 提交过程中
"SUCCEED": "提交成功", # 提交成功
"FAILED": "提交失败" # 提交失败
}
# 全局任务字典key=target_idvalue=任务详情)
task_map: Dict[str, Dict] = {}
task_map_lock = threading.Lock() # 任务字典线程锁
# 集群资源配置key=集群IDvalue=总资源/可用资源)
cluster_resources: Dict[str, Dict] = {
"1790300942428540928": { # modelarts集群
"total": {"CPU": 96, "MEMORY": 1024, "NPU": 2},
"available": {"CPU": 48, "MEMORY": 512, "NPU": 1}
},
"1865927992266461184": { # openi集群
"total": {"CPU": 48, "MEMORY": 512, "DCU": 1},
"available": {"CPU": 24, "MEMORY": 256, "DCU": 1}
},
"1865927992266462181": { # 章鱼集群
"total": {"CPU": 48, "MEMORY": 512, "DCU": 1},
"available": {"CPU": 24, "MEMORY": 256, "DCU": 1}
},
"1777240145309732864": { # 曙光集群
"total": {"CPU": 48, "MEMORY": 512, "NPU": 1},
"available": {"CPU": 24, "MEMORY": 256, "NPU": 1}
},
}
cluster_lock = threading.Lock() # 集群资源线程锁
# -------------------------- 数据结构定义 --------------------------
class DatasetInfo(dict):
"""数据集信息结构"""
def __init__(self, file_location: str, name: str, size: float, **kwargs):
super().__init__()
self["file_location"] = file_location # 本地路径(主键)
self["id"] = kwargs.get("id", str(uuid4())) # 数据集唯一标识
self["name"] = name # 数据集名称
self["size"] = size # 大小(字节)
self["is_uploaded"] = kwargs.get("is_uploaded", False) # 是否已上传
self["upload_cluster"] = kwargs.get("upload_cluster", []) # 上传的集群
self["upload_time"] = kwargs.get("upload_time") # 上传时间
self["description"] = kwargs.get("description") # 描述
class AlgorithmInfo(dict):
"""算法信息结构"""
def __init__(self, cluster: str, id: str, name: str, **kwargs):
super().__init__()
self["cluster"] = cluster # 所属集群
self["id"] = id # 算法唯一标识
self["son_id"] = kwargs.get("son_id", "") # 子算法ID
self["name"] = name # 算法名称
class TaskInfo(dict):
"""任务信息结构新增success_time字段记录成功时间"""
def __init__(self, task_name: str, dataset_name: str, code_id: str, resource: Dict, **kwargs):
super().__init__()
self["target_id"] = kwargs.get("target_id", str(uuid4())) # 任务唯一ID
self["task_name"] = task_name # 任务名称
self["package_name"] = kwargs.get("package_name", f"{task_name.lower()}-pkg") # 文件夹名称
self["dataset_name"] = dataset_name # 关联数据集名称
self["code_id"] = code_id # 算法ID
self["son_code_id"] = "" # 子算法ID提交时填充
self["resource"] = resource # 资源需求CPU/MEMORY/NPU等
self["status"] = TASK_STATUS["SUBMITTED"] # 初始状态:待提交
self["submit_time"] = kwargs.get("submit_time", time.strftime("%Y-%m-%d %H:%M:%S")) # 提交时间
self["success_time"] = None # 成功时间(成功时填充)
self["third_party_task_id"] = "" # 第三方任务ID提交后填充
self["file_location"] = kwargs.get("file_location", "") # 本地文件路径
self["error_msg"] = "" # 错误信息
self["fail_count"] = 0 # 失败次数原retry_count改为fail_count更贴合语义
self["max_fail_threshold"] = kwargs.get("max_fail_threshold", 3) # 最大失败阈值
self["cluster_id"] = "" # 提交的集群ID提交时填充
# -------------------------- 工具方法 --------------------------
def generate_task_templates() -> List[Dict]:
"""生成任务静态数据模板"""
return [
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AA",
"dataset_name": "data1.zip",
"code_id": "1164",
"file_location": "D:/数据集/cnn数据集/data1/",
"resource": {"CPU": 24, "MEMORY": 256, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AB",
"dataset_name": "cifar-10-python.tar.gz",
"code_id": "1",
"file_location": "D:/数据集/cnn数据集/data2/",
"resource": {"CPU": 24, "MEMORY": 256, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AC",
"dataset_name": "cifar-100-python.tar.gz",
"code_id": "1",
"file_location": "D:/数据集/cnn数据集/data3/",
"resource": {"CPU": 24, "MEMORY": 256, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AD",
"dataset_name": "dev.jsonl",
"code_id": "2",
"file_location": "D:/数据集/transfomer数据集/BoolQ/",
"resource": {"CPU": 24, "MEMORY": 256, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AE",
"dataset_name": "dev.jsonl",
"file_location": "D:/数据集/transfomer数据集/BoolQ/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AF",
"dataset_name": "ceval.zip",
"file_location": "D:/数据集/transfomer数据集/CEval/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AG",
"dataset_name": "CMMLU.zip",
"file_location": "D:/数据集/transfomer数据集/CMMLU/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AH",
"dataset_name": "mental_health.csv",
"file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AI",
"dataset_name": "GSM8K.jsonl",
"file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AJ",
"dataset_name": "human-eval.jsonl",
"file_location": "D:/数据集/transfomer数据集/HumanEval/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AK",
"dataset_name": "HumanEval_X.zip",
"file_location": "D:/数据集/transfomer数据集/HumanEval_X/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AF",
"dataset_name": "ceval.zip",
"file_location": "D:/数据集/transfomer数据集/CEval/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AG",
"dataset_name": "CMMLU.zip",
"file_location": "D:/数据集/transfomer数据集/CMMLU/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AH",
"dataset_name": "mental_health.csv",
"file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AI",
"dataset_name": "GSM8K.jsonl",
"file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AJ",
"dataset_name": "human-eval.jsonl",
"file_location": "D:/数据集/transfomer数据集/HumanEval/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AK",
"dataset_name": "HumanEval_X.zip",
"file_location": "D:/数据集/transfomer数据集/HumanEval_X/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
}
]
def load_tasks_to_queue(templates: List[Dict]) -> None:
"""将任务静态数据加载到任务队列task_map"""
global task_map
with task_map_lock:
task_map.clear()
for template in templates:
task_name = template["task_name_template"].format(prefix=template["prefix"])
task = TaskInfo(
task_name=task_name,
dataset_name=template["dataset_name"],
code_id=template["code_id"],
resource=template["resource"],
file_location=template["file_location"]
)
task_map[task["target_id"]] = task
logger.info(f"任务入队 | task_name: {task_name} | target_id: {task['target_id']}")
logger.info(f"任务队列加载完成 | 共 {len(task_map)} 个任务")
def select_cluster(task_resource: Dict) -> Optional[str]:
"""根据任务资源需求选择合适的集群"""
with cluster_lock:
for cluster_id, cluster in cluster_resources.items():
# 检查集群可用资源是否满足任务需求支持NPU/DCU等不同加速卡类型
resource_match = True
for res_type, required in task_resource.items():
# 集群可用资源中可能是NPU或DCU统一检查
available = cluster["available"].get(res_type, 0)
if available < required:
resource_match = False
break
if resource_match:
return cluster_id
logger.warning(f"无满足资源需求的集群 | 任务需求: {task_resource}")
return None
def get_son_code_id(cluster_id: str, code_id: str) -> str:
"""根据集群ID和算法ID查询子算法ID模拟接口查询"""
son_code_map = {
("1790300942428540928", "1"): "1-1",
("1790300942428540928", "2"): "2-1",
("1777240145309732864", "1"): "1-2",
("1865927992266461184", "2"): "2-2"
}
return son_code_map.get((cluster_id, code_id), f"{code_id}-default")
def get_auth_token() -> Optional[str]:
"""获取认证Token模拟接口"""
return "mock_valid_token"
def query_third_party_task_status(third_party_task_id: str) -> str:
"""查询云际平台任务状态(返回任务状态)"""
# 云际平台状态SUBMITTING提交中、SUCCEEDED成功、FAILED失败
mock_status_map = {
"task-1001": "SUCCEEDED",
"task-1002": "FAILED",
"task-1003": "SUBMITTING",
"task-1004": "SUCCEEDED",
"task-1005": "FAILED"
}
return mock_status_map.get(third_party_task_id, "SUBMITTING")
# -------------------------- 线程一:任务监控线程 --------------------------
class TaskMonitorThread(threading.Thread):
"""监控线程:专注监控任务状态,仅处理提交中任务的状态更新"""
def __init__(self, check_interval: int = 10):
super().__init__(name="TaskMonitorThread")
self.check_interval = check_interval # 监控间隔(秒)
self._stop_event = threading.Event()
def run(self) -> None:
logger.info(f"监控线程启动 | 监控间隔: {self.check_interval}")
while not self._stop_event.is_set():
with task_map_lock:
tasks = list(task_map.values()) # 复制任务列表,避免线程安全问题
for task in tasks:
with task_map_lock:
current_status = task["status"]
# 1. 待提交状态:不处理(由提交线程处理)
if current_status == TASK_STATUS["SUBMITTED"]:
continue
# 2. 提交中状态:定时查询第三方状态并更新
elif current_status == TASK_STATUS["SUBMITTING"]:
if not task["third_party_task_id"]:
logger.warning(f"任务 {task['task_name']} 无第三方ID跳过状态查询")
continue
# 查询第三方状态
third_status = query_third_party_task_status(task["third_party_task_id"])
with task_map_lock:
# 2.1 第三方状态为成功:更新为提交成功,记录成功时间
if third_status == "SUCCEEDED":
task["status"] = TASK_STATUS["SUCCEED"]
task["success_time"] = time.strftime("%Y-%m-%d %H:%M:%S")
logger.info(
f"任务状态更新 | task_name: {task['task_name']} | 提交成功 | 成功时间: {task['success_time']}")
# 2.2 第三方状态为失败:更新为提交失败,失败次数+1
elif third_status == "FAILED":
task["status"] = TASK_STATUS["FAILED"]
task["fail_count"] += 1
task["error_msg"] = f"第三方任务执行失败(第{task['fail_count']}次)"
logger.warning(
f"任务状态更新 | task_name: {task['task_name']} | 提交失败 | 失败次数: {task['fail_count']}/{task['max_fail_threshold']}")
# 2.3 第三方状态为提交中:不更新状态
# 3. 提交成功状态:不处理
elif current_status == TASK_STATUS["SUCCEED"]:
continue
# 4. 提交失败状态:不处理(由提交线程判断是否重试)
elif current_status == TASK_STATUS["FAILED"]:
continue
# 检查是否所有任务已完成(成功或失败次数超阈值)
all_completed = self._check_all_tasks_completed()
if all_completed:
logger.info("所有任务已完成(成功或失败次数超过阈值)")
self.stop()
# 等待下次监控
self._stop_event.wait(self.check_interval)
logger.info("监控线程结束")
def _check_all_tasks_completed(self) -> bool:
"""检查是否所有任务已完成(成功或失败次数超阈值)"""
with task_map_lock:
for task in task_map.values():
# 待提交或提交中:未完成
if task["status"] in [TASK_STATUS["SUBMITTED"], TASK_STATUS["SUBMITTING"]]:
return False
# 提交失败但次数未超阈值:未完成(可能被提交线程重试)
if task["status"] == TASK_STATUS["FAILED"] and task["fail_count"] < task["max_fail_threshold"]:
return False
return True
def stop(self) -> None:
self._stop_event.set()
# -------------------------- 线程二:任务提交线程 --------------------------
class TaskSubmitThread(threading.Thread):
"""提交线程:按状态判断是否提交,处理待提交和未超阈值的失败任务"""
def __init__(self, max_workers: int = 3):
super().__init__(name="TaskSubmitThread")
self.max_workers = max_workers # 并发提交数
self._stop_event = threading.Event()
def run(self) -> None:
logger.info(f"提交线程启动 | 并发数: {self.max_workers}")
while not self._stop_event.is_set():
# 1. 筛选符合条件的任务:待提交 或 失败次数未超阈值的提交失败任务
with task_map_lock:
pending_tasks = []
for task in task_map.values():
status = task["status"]
# 1.1 待提交状态:直接提交
if status == TASK_STATUS["SUBMITTED"]:
pending_tasks.append(task)
# 1.2 提交失败状态:检查失败次数,未超阈值则提交
elif status == TASK_STATUS["FAILED"]:
if task["fail_count"] < task["max_fail_threshold"]:
pending_tasks.append(task)
else:
logger.info(
f"任务 {task['task_name']} 失败次数超阈值({task['max_fail_threshold']}),停止提交")
if not pending_tasks:
logger.info("无待提交任务,等待下次检查")
self._stop_event.wait(5)
continue
# 2. 并发提交任务
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.commit_task, task): task for task in pending_tasks}
for future in concurrent.futures.as_completed(futures):
task = futures[future]
try:
future.result()
except Exception as e:
with task_map_lock:
task["status"] = TASK_STATUS["FAILED"]
task["fail_count"] += 1
task["error_msg"] = f"提交过程异常: {str(e)}"
logger.error(f"任务提交异常 | task_name: {task['task_name']} | 错误: {str(e)}")
logger.info("提交线程结束")
def commit_task(self, task: Dict) -> None:
"""提交单个任务(核心提交逻辑)"""
# 1. 标记任务状态为提交中
with task_map_lock:
task["status"] = TASK_STATUS["SUBMITTING"]
logger.info(f"开始提交任务 | task_name: {task['task_name']} | 当前状态: {task['status']}")
# 1.1 选择集群
cluster_id = select_cluster(task["resource"])
if not cluster_id:
with task_map_lock:
task["status"] = TASK_STATUS["FAILED"]
task["error_msg"] = "无满足资源需求的集群"
logger.error(f"任务提交失败 | task_name: {task['task_name']} | 原因: 无可用集群")
return
# 1.2 根据集群ID和算法ID查询子算法ID
son_code_id = get_son_code_id(cluster_id, task["code_id"])
if not son_code_id:
with task_map_lock:
task["status"] = TASK_STATUS["FAILED"]
task["error_msg"] = "未查询到子算法ID"
logger.error(f"任务提交失败 | task_name: {task['task_name']} | 原因: 子算法ID不存在")
return
# 1.3 获取认证Token
token = get_auth_token()
if not token:
with task_map_lock:
task["status"] = TASK_STATUS["FAILED"]
task["error_msg"] = "获取认证Token失败"
logger.error(f"任务提交失败 | task_name: {task['task_name']} | 原因: Token获取失败")
return
# 2. 模拟调用第三方接口提交任务实际场景替换为真实API
try:
# 生成第三方任务ID模拟接口返回
third_party_task_id = f"task-{hash(task['target_id'])}"
logger.info(f"第三方任务提交成功 | task_name: {task['task_name']} | 第三方ID: {third_party_task_id}")
# 3. 更新任务信息集群ID、子算法ID、第三方ID
with task_map_lock:
task["cluster_id"] = cluster_id
task["son_code_id"] = son_code_id
task["third_party_task_id"] = third_party_task_id
logger.info(
f"任务提交信息更新 | task_name: {task['task_name']} | 集群ID: {cluster_id} | 子算法ID: {son_code_id}")
except Exception as e:
with task_map_lock:
task["status"] = TASK_STATUS["FAILED"]
task["fail_count"] += 1
task["error_msg"] = f"第三方接口调用失败: {str(e)}"
logger.error(f"任务提交失败 | task_name: {task['task_name']} | 原因: {str(e)}")
def stop(self) -> None:
self._stop_event.set()
# -------------------------- 主程序 --------------------------
if __name__ == "__main__":
# 1. 生成任务静态数据
task_templates = generate_task_templates()
# 2. 读取任务进入队列
load_tasks_to_queue(task_templates)
# 3. 启动监控线程
monitor_thread = TaskMonitorThread(check_interval=10)
monitor_thread.start()
# 4. 启动提交线程
submit_thread = TaskSubmitThread(max_workers=3)
submit_thread.start()
# 5. 等待线程结束
monitor_thread.join()
submit_thread.join()
logger.info("所有任务处理完毕,程序退出")