SchedulingSimulator/commit_task_jointcloud_sonc...

772 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import concurrent.futures
import time
import logging
import threading
from uuid import uuid4
from typing import Dict, List, Optional
import requests
import os
import json
# -------------------------- 全局配置与常量定义 --------------------------
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# 任务状态定义
TASK_STATUS = {
"SUBMITTED": "待提交", # 初始状态
"SUBMITTING": "提交中", # 提交过程中
"SUCCEED": "提交成功", # 云际确认成功
"FAILED": "提交失败", # 云际确认失败
"RETRY_EXHAUSTED": "重试耗尽" # 超过最大失败次数
}
# 全局任务字典key=target_idvalue=任务详情)
task_map: Dict[str, Dict] = {}
task_map_lock = threading.Lock() # 任务字典线程锁
# API配置
API_CONFIG = {
"login": {
"url": "http://119.45.255.234:30180/jcc-admin/admin/login",
"timeout": 10
},
"create_package": {
"url": "http://119.45.255.234:30180/jsm/jobSet/createPackage",
"timeout": 15
},
"upload_file": {
"url": "http://119.45.255.234:30180/jcs/object/upload",
"timeout": 3000
},
"notify_upload": {
"url": "http://119.45.255.234:30180/jsm/jobSet/notifyUploaded",
"timeout": 15
},
"bind_cluster": {
"url": "http://119.45.255.234:30180/jsm/jobSet/binding",
"timeout": 15
},
"query_binding": {
"url": "http://119.45.255.234:30180/jsm/jobSet/queryBinding",
"timeout": 15
},
"submit_task": {
"url": "http://119.45.255.234:30180/jsm/jobSet/submit",
"timeout": 15
},
"task_details": {
"url": "http://119.45.255.234:30180/pcm/v1/core/task/details",
"timeout": 15
}
}
# 集群资源配置key=集群IDvalue=总资源/可用资源)
cluster_resources: Dict[str, Dict] = {
"1790300942428540928": { # modelarts集群
"total": {"CPU": 512, "MEMORY": 1024, "NPU": 8},
"available": {"CPU": 512, "MEMORY": 1024, "NPU": 8}
}
# "1865927992266461184": { # openi集群
# "total": {"CPU": 48, "MEMORY": 512, "DCU": 1},
# "available": {"CPU": 24, "MEMORY": 256, "DCU": 1}
# },
# "1865927992266462181": { # 章鱼集群
# "total": {"CPU": 48, "MEMORY": 512, "DCU": 1},
# "available": {"CPU": 24, "MEMORY": 256, "DCU": 1}
# },
# "1777240145309732864": { # 曙光集群
# "total": {"CPU": 48, "MEMORY": 512, "NPU": 1},
# "available": {"CPU": 24, "MEMORY": 256, "NPU": 1}
# },
}
cluster_lock = threading.Lock() # 集群资源线程锁
# -------------------------- 数据结构定义 --------------------------
class DatasetInfo(dict):
"""数据集信息结构"""
def __init__(self, file_location: str, name: str, size: float, **kwargs):
super().__init__()
self["file_location"] = file_location # 本地路径(主键)
self["id"] = kwargs.get("id", str(uuid4())) # 数据集唯一标识
self["name"] = name # 数据集名称
self["size"] = size # 大小(字节)
self["is_uploaded"] = kwargs.get("is_uploaded", False) # 是否已上传
self["upload_cluster"] = kwargs.get("upload_cluster", []) # 上传的集群
self["upload_time"] = kwargs.get("upload_time") # 上传时间
self["description"] = kwargs.get("description") # 描述
class AlgorithmInfo(dict):
"""算法信息结构"""
def __init__(self, cluster: str, id: str, name: str, **kwargs):
super().__init__()
self["cluster"] = cluster # 所属集群
self["id"] = id # 算法唯一标识
self["son_id"] = kwargs.get("son_id", "") # 子算法ID
self["name"] = name # 算法名称
class TaskInfo(dict):
"""任务信息结构"""
def __init__(self, task_name: str, dataset_name: str, son_code_id: int, resource: Dict,** kwargs):
super().__init__()
self["target_id"] = kwargs.get("target_id", str(uuid4())) # 任务唯一ID
self["task_name"] = task_name # 任务名称
self["package_name"] = kwargs.get("package_name", f"{task_name.lower()}-pkg") # 文件夹名称
self["dataset_name"] = dataset_name # 关联数据集名称
self["son_code_id"] = son_code_id # 子算法ID直接从模板获取
self["resource"] = resource # 资源需求CPU/MEMORY/NPU等
self["status"] = TASK_STATUS["SUBMITTED"] # 初始状态:待提交
self["submit_time"] = kwargs.get("submit_time", time.strftime("%Y-%m-%d %H:%M:%S")) # 提交时间
self["success_time"] = None # 成功时间(成功时填充)
self["third_party_task_id"] = "" # 云际任务ID提交后填充
self["file_location"] = kwargs.get("file_location", "") # 本地文件路径
self["error_msg"] = "" # 错误信息
self["fail_count"] = 0 # 失败次数
self["max_fail_threshold"] = kwargs.get("max_fail_threshold", 3) # 最大失败阈值
self["cluster_id"] = "" # 提交的集群ID提交时填充
# -------------------------- 工具方法 --------------------------
def generate_task_templates() -> List[Dict]:
"""生成任务静态数据模板包含son_code_id"""
return [
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AA",
"dataset_name": "data1.zip",
"son_code_id": 1165, # 子算法ID直接定义在模板中
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1},
"file_location": "D:/数据集/cnn数据集/data1/"
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AB",
"dataset_name": "cifar-10-python.tar.gz",
"son_code_id": 1167,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1},
"file_location": "D:/数据集/cnn数据集/data2/"
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AC",
"dataset_name": "cifar-100-python.tar.gz",
"son_code_id": 1169,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1},
"file_location": "D:/数据集/cnn数据集/data3/"
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AD",
"dataset_name": "dev.jsonl",
"son_code_id": 1171,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1},
"file_location": "D:/数据集/transfomer数据集/BoolQ/"
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AE",
"dataset_name": "ceval.zip",
"file_location": "D:/数据集/transfomer数据集/CEval/",
"son_code_id": 1173,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AF",
"dataset_name": "CMMLU.zip",
"file_location": "D:/数据集/transfomer数据集/CMMLU/",
"son_code_id": 1178,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AH",
"dataset_name": "mental_health.csv",
"file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/",
"son_code_id": 1175,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AI",
"dataset_name": "GSM8K.jsonl",
"file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/",
"son_code_id": 1180,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AJ",
"dataset_name": "human-eval.jsonl",
"file_location": "D:/数据集/transfomer数据集/HumanEval/",
"son_code_id": 1182,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AK",
"dataset_name": "HumanEval_X.zip",
"file_location": "D:/数据集/transfomer数据集/HumanEval_X/",
"son_code_id": 1184,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AF",
"dataset_name": "ceval.zip",
"file_location": "D:/数据集/transfomer数据集/CEval/",
"son_code_id": 1173,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AG",
"dataset_name": "CMMLU.zip",
"file_location": "D:/数据集/transfomer数据集/CMMLU/",
"son_code_id": 1178,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AH",
"dataset_name": "mental_health.csv",
"file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/",
"son_code_id": 1175,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AI",
"dataset_name": "GSM8K.jsonl",
"file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/",
"son_code_id": 1180,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AJ",
"dataset_name": "human-eval.jsonl",
"file_location": "D:/数据集/transfomer数据集/HumanEval/",
"son_code_id": 1182,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AK",
"dataset_name": "HumanEval_X.zip",
"file_location": "D:/数据集/transfomer数据集/HumanEval_X/",
"son_code_id": 1184,
"resource": {"CPU": 24, "MEMORY": 24, "NPU": 1}
}
]
def load_tasks_to_queue(templates: List[Dict]) -> None:
"""将任务静态数据加载到任务队列使用son_code_id"""
global task_map
with task_map_lock:
task_map.clear()
for idx, template in enumerate(templates):
try:
# 检查必填字段更新为son_code_id
required_fields = ["task_name_template", "prefix", "dataset_name", "son_code_id", "resource", "file_location"]
missing_fields = [f for f in required_fields if f not in template]
if missing_fields:
logger.warning(f"跳过无效任务模板(索引{idx}):缺少字段 {missing_fields}")
continue
task_name = template["task_name_template"].format(prefix=template["prefix"])
task = TaskInfo(
task_name=task_name,
dataset_name=template["dataset_name"],
son_code_id=template["son_code_id"], # 直接使用模板中的子算法ID
resource=template["resource"],
file_location=template["file_location"]
)
task_map[task["target_id"]] = task
logger.info(f"任务入队 | task_name: {task_name} | target_id: {task['target_id']}")
except Exception as e:
logger.error(f"加载任务模板失败(索引{idx}{str(e)}")
logger.info(f"任务队列加载完成 | 共 {len(task_map)} 个有效任务")
def select_cluster(task_resource: Dict) -> Optional[str]:
"""根据任务资源需求选择合适的集群"""
with cluster_lock:
for cluster_id, cluster in cluster_resources.items():
# 检查集群可用资源是否满足任务需求支持NPU/DCU等不同加速卡类型
resource_match = True
for res_type, required in task_resource.items():
available = cluster["available"].get(res_type, 0)
if available < required:
resource_match = False
break
if resource_match:
# 选中集群后更新可用资源(减去任务所需资源)
for res_type, required in task_resource.items():
if res_type in cluster["available"]:
cluster["available"][res_type] -= required
logger.info(f"选中集群 {cluster_id},更新后可用资源: {cluster['available']}")
return cluster_id
logger.warning(f"无满足资源需求的集群 | 任务需求: {task_resource}")
return None
# -------------------------- API调用方法 --------------------------
def get_token() -> Optional[str]:
"""获取认证Token"""
login_payload = {"username": "admin", "password": "Nudt@123"}
try:
config = API_CONFIG["login"]
response = requests.post(config["url"], json=login_payload, timeout=config["timeout"])
response.raise_for_status()
result = response.json()
if result.get("code") == 200 and "data" in result and "token" in result["data"]:
logger.info("Token获取成功")
return result["data"]["token"]
else:
logger.error(f"Token获取失败 | 响应: {result}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"登录请求异常: {str(e)}", exc_info=True)
return None
def submit_single_task(task: Dict) -> bool:
"""提交单个任务到集群使用模板中的son_code_id"""
token = get_token()
if not token:
with task_map_lock:
task["status"] = TASK_STATUS["FAILED"]
task["error_msg"] = "获取Token失败"
return False
# 获取选择的集群ID
cluster_id = task.get("cluster_id")
if not cluster_id:
with task_map_lock:
task["status"] = TASK_STATUS["FAILED"]
task["error_msg"] = "未指定集群ID"
logger.error(f"[{task['task_name']}] 提交失败未指定集群ID")
return False
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {token}'
}
task_name = task["task_name"]
package_name = task["package_name"]
file_name = task["dataset_name"]
file_location = task["file_location"]
son_code_id = task["son_code_id"] # 直接使用任务中的子算法ID来自模板
file_path = os.path.join(file_location, file_name)
try:
# 第一步创建数据集文件夹使用选中的集群ID
config = API_CONFIG["create_package"]
create_payload = {
"userID": 5,
"name": package_name,
"dataType": "dataset",
"packageID": 0,
"uploadPriority": {"type": "specify", "clusters": [cluster_id]},
"bindingInfo": {
"clusterIDs": [cluster_id],
"name": package_name,
"category": "image",
"type": "dataset",
"imageID": "",
"bias": [],
"region": [],
"chip": ["ASCEND"],
"selectedCluster": [],
"modelType": "",
"env": "",
"version": "",
"packageID": 0,
"points": 0
}
}
create_resp = requests.post(config["url"], json=create_payload, headers=headers, timeout=config["timeout"])
create_resp.raise_for_status()
create_result = create_resp.json()
if create_result.get("code") != "OK":
raise ValueError(f"创建文件夹失败 | API返回: {create_result}")
packageID = create_result["data"]["newPackage"]["packageID"]
logger.info(f"[{task_name}] 第一步:创建文件夹成功 | packageID: {packageID} | 集群: {cluster_id}")
# 第二步:上传数据集文件
config = API_CONFIG["upload_file"]
if not os.path.exists(file_path):
raise FileNotFoundError(f"数据集文件不存在 | path: {file_path}")
info_data = {
"userID": 5,
"packageID": packageID,
"loadTo": [3],
"loadToPath": [f"/dataset/5/{package_name}/"]
}
file_headers = {'Authorization': f'Bearer {token}'}
with open(file_path, 'rb') as f:
form_data = {"info": (None, json.dumps(info_data)), "files": f}
upload_resp = requests.post(config["url"], files=form_data, headers=file_headers, timeout=config["timeout"])
upload_resp.raise_for_status()
upload_result = upload_resp.json()
if upload_result.get("code") != "OK":
raise ValueError(f"文件上传失败 | API返回: {upload_result}")
object_id = upload_result["data"]["uploadeds"][0]["objectID"]
logger.info(f"[{task_name}] 第二步:文件上传成功 | objectID: {object_id}")
# 第三步:通知上传完成
config = API_CONFIG["notify_upload"]
notify_payload = {
"userID": 5,
"packageID": packageID,
"uploadParams": {
"dataType": "dataset",
"uploadInfo": {"type": "local", "localPath": file_name, "objectIDs": [object_id]}
}
}
notify_resp = requests.post(config["url"], json=notify_payload, headers=headers, timeout=config["timeout"])
notify_resp.raise_for_status()
notify_result = notify_resp.json()
if notify_result.get("code") != "OK":
raise ValueError(f"通知上传完成失败 | API返回: {notify_result}")
logger.info(f"[{task_name}] 第三步:通知上传完成成功")
# 第四步绑定数据集到集群使用选中的集群ID
config = API_CONFIG["bind_cluster"]
bind_payload = {
"userID": 5,
"info": {"type": "dataset", "packageID": packageID, "clusterIDs": [cluster_id]}
}
bind_resp = requests.post(config["url"], json=bind_payload, headers=headers, timeout=config["timeout"])
bind_resp.raise_for_status()
bind_result = bind_resp.json()
if bind_result.get("code") != "OK":
raise ValueError(f"绑定集群失败 | API返回: {bind_result}")
logger.info(f"[{task_name}] 第四步:数据集绑定集群 {cluster_id} 成功")
# 第五步查询绑定ID
config = API_CONFIG["query_binding"]
query_bind_payload = {
"dataType": "dataset",
"param": {"userID": 5, "bindingID": -1, "type": "private"}
}
query_bind_resp = requests.post(config["url"], json=query_bind_payload, headers=headers, timeout=config["timeout"]).json()
if query_bind_resp.get("code") != "OK":
raise ValueError(f"查询绑定失败 | API返回: {query_bind_resp}")
# 提取目标绑定ID
dataset_target_id = None
for data in query_bind_resp["data"]["datas"]:
if data["info"]["name"] == package_name:
dataset_target_id = data["ID"]
break
if not dataset_target_id:
raise ValueError(f"未找到package_name={package_name}的绑定ID")
logger.info(f"[{task_name}] 第五步获取绑定ID成功 | target_id: {dataset_target_id}")
# 第六步提交训练任务使用模板中的son_code_id修复参数匹配问题
config = API_CONFIG["submit_task"]
task_res = task["resource"]
# 构建与接口要求匹配的提交参数
submit_payload = {
"userID": 5,
"jobSetInfo": {
"jobs": [
{
"localJobID": "1", # 与示例一致的localJobID
"name": task_name,
"description": "自动提交的训练任务",
"type": "AI", # 与示例一致的任务类型
"files": {
"dataset": {"type": "Binding", "bindingID": dataset_target_id}, # 数据集绑定ID
"model": {"type": "Binding", "bindingID": ""}, # 模型绑定ID留空与示例一致
"image": {"type": "Image", "imageID": 11} # 镜像ID与示例一致
},
"jobResources": {
"scheduleStrategy": "dataLocality", # 调度策略与示例一致
"clusters": [
{
"clusterID": cluster_id, # 动态选择的集群ID
"runtime": {"envs": {}, "params": {}}, # 运行时参数(空字典与示例一致)
"code": {"type": "Binding", "bindingID": son_code_id}, # 子算法ID来自模板
"resources": [
{"type": "CPU", "name": "ARM", "number": task_res["CPU"]}, # CPU数量匹配任务需求
{"type": "MEMORY", "name": "RAM", "number": task_res["MEMORY"]}, # 内存数量匹配需求
{"type": "MEMORY", "name": "VRAM", "number": 32}, # 显存与示例一致
{"type": "STORAGE", "name": "DISK", "number": 886}, # 磁盘数量与示例匹配(关键修复)
{
"type": "NPU",
"name": "ASCEND910", # NPU名称与示例一致关键修复
"number": task_res.get("NPU", 0) # NPU数量匹配任务需求
}
]
}
]
}
}
]
}
}
#submit_resp = requests.post(config["url"], json=submit_payload, headers=headers, timeout=config["timeout"]).json()
# 发送请求并严格校验HTTP状态码
response = requests.post(
config["url"],
json=submit_payload,
headers=headers,
timeout=config["timeout"]
)
response.raise_for_status() # 先校验HTTP状态码如404/500等错误
submit_resp = response.json()
# 若接口返回{"code": "OK", ...}
if submit_resp.get("code") != "OK":
raise ValueError(f"任务提交失败 | API返回: {submit_resp}")
third_party_task_id = submit_resp.get('data', {}).get('jobSetID')
logger.info(f"[{task_name}] 第六步:任务提交至集群 {cluster_id} 成功 | 云际任务ID: {third_party_task_id}")
# 更新任务状态为成功
with task_map_lock:
task["status"] = TASK_STATUS["SUCCEED"]
task["third_party_task_id"] = third_party_task_id
return True
except Exception as e:
error_msg = f"提交失败: {str(e)}"
# 任务失败时释放集群资源
with cluster_lock:
if cluster_id in cluster_resources:
for res_type, required in task["resource"].items():
if res_type in cluster_resources[cluster_id]["available"]:
cluster_resources[cluster_id]["available"][res_type] += required
logger.info(f"任务失败,释放集群 {cluster_id} 资源: {task['resource']}")
with task_map_lock:
task["fail_count"] += 1
if task["fail_count"] >= task["max_fail_threshold"]:
task["status"] = TASK_STATUS["RETRY_EXHAUSTED"]
else:
task["status"] = TASK_STATUS["FAILED"]
task["error_msg"] = error_msg
logger.error(f"[{task_name}] {error_msg}", exc_info=True)
return False
def query_third_party_task_status(third_party_task_id: str) -> Optional[str]:
"""查询云际平台任务状态"""
if not third_party_task_id:
logger.warning("云际任务ID为空无法查询状态")
return None
try:
config = API_CONFIG["task_details"]
params = {"id": third_party_task_id}
token = get_token()
headers = {"Authorization": f"Bearer {token}"} if token else {}
response = requests.get(
config["url"],
params=params,
headers=headers,
timeout=config["timeout"]
)
response.raise_for_status()
result = response.json()
if result.get("code") != "OK":
logger.error(f"查询任务状态失败 | 任务ID: {third_party_task_id} | 响应: {result}")
return None
sub_task_infos = result.get("data", {}).get("subTaskInfos", [])
if not sub_task_infos:
logger.warning(f"任务 {third_party_task_id} 未找到subTaskInfos数据")
return None
return sub_task_infos[0].get("status")
except requests.exceptions.RequestException as e:
logger.error(f"查询任务状态请求异常 | 任务ID: {third_party_task_id} | 错误: {str(e)}", exc_info=True)
return None
except (KeyError, IndexError) as e:
logger.error(f"解析任务状态响应失败 | 任务ID: {third_party_task_id} | 错误: {str(e)}", exc_info=True)
return None
# -------------------------- 线程一:任务监控线程 --------------------------
class TaskMonitorThread(threading.Thread):
"""监控线程:专注监控任务状态"""
def __init__(self, check_interval: int = 10):
super().__init__(name="TaskMonitorThread")
self.check_interval = check_interval
self._stop_event = threading.Event()
def run(self) -> None:
logger.info(f"监控线程启动 | 监控间隔: {self.check_interval}")
while not self._stop_event.is_set():
with task_map_lock:
tasks = list(task_map.values())
for task in tasks:
with task_map_lock:
current_status = task["status"]
if current_status == TASK_STATUS["SUBMITTED"]:
continue
elif current_status == TASK_STATUS["SUBMITTING"]:
if not task["third_party_task_id"]:
logger.warning(f"任务 {task['task_name']} 无云际ID跳过状态查询")
continue
third_status = query_third_party_task_status(task["third_party_task_id"])
with task_map_lock:
if third_status == "Succeeded" or "Completed":
task["status"] = TASK_STATUS["SUCCEED"]
task["success_time"] = time.strftime("%Y-%m-%d %H:%M:%S")
logger.info(
f"任务状态更新 | task_name: {task['task_name']} | 提交成功 | 成功时间: {task['success_time']}")
else:
task["status"] = TASK_STATUS["FAILED"]
task["fail_count"] += 1
task["error_msg"] = f"云际任务执行失败(第{task['fail_count']}次)"
logger.warning(
f"任务状态更新 | task_name: {task['task_name']} | 提交失败 | 失败次数: {task['fail_count']}/{task['max_fail_threshold']}")
if self._check_all_tasks_completed():
logger.info("所有任务已完成")
self.stop()
self._stop_event.wait(self.check_interval)
logger.info("监控线程结束")
def _check_all_tasks_completed(self) -> bool:
"""检查是否所有任务已完成"""
with task_map_lock:
for task in task_map.values():
if task["status"] in [TASK_STATUS["SUBMITTED"], TASK_STATUS["SUBMITTING"]]:
return False
if task["status"] == TASK_STATUS["FAILED"] and task["fail_count"] < task["max_fail_threshold"]:
return False
return True
def stop(self) -> None:
self._stop_event.set()
# -------------------------- 线程二:任务提交线程 --------------------------
class TaskSubmitThread(threading.Thread):
"""提交线程:按状态判断是否提交"""
def __init__(self, max_workers: int = 3):
super().__init__(name="TaskSubmitThread")
self.max_workers = max_workers
self._stop_event = threading.Event()
def run(self) -> None:
logger.info(f"提交线程启动 | 并发数: {self.max_workers}")
while not self._stop_event.is_set():
# 筛选待提交任务
with task_map_lock:
pending_tasks = []
for task in task_map.values():
status = task["status"]
if status == TASK_STATUS["SUBMITTED"]:
pending_tasks.append(task)
elif status == TASK_STATUS["FAILED"] and task["fail_count"] < task["max_fail_threshold"]:
pending_tasks.append(task)
elif status == TASK_STATUS["FAILED"]:
logger.info(
f"任务 {task['task_name']} 失败次数超阈值,停止提交")
# 并发提交任务
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.commit_task, task): task for task in pending_tasks}
for future in concurrent.futures.as_completed(futures):
task = futures[future]
try:
future.result()
except Exception as e:
with task_map_lock:
task["status"] = TASK_STATUS["FAILED"]
task["fail_count"] += 1
task["error_msg"] = f"提交过程异常: {str(e)}"
logger.error(f"任务提交异常 | task_name: {task['task_name']} | 错误: {str(e)}")
if self._check_all_tasks_completed():
logger.info("所有任务已完成,提交线程退出")
self.stop()
break
if not pending_tasks:
logger.info("无待提交任务,等待下次检查")
self._stop_event.wait(5)
logger.info("提交线程结束")
def commit_task(self, task: Dict) -> None:
"""提交任务入口:先选集群,再提交"""
cluster_id = select_cluster(task["resource"])
if not cluster_id:
with task_map_lock:
task["status"] = TASK_STATUS["FAILED"]
task["fail_count"] += 1
task["error_msg"] = "无可用集群"
logger.error(f"[{task['task_name']}] 提交失败:无可用集群")
return
# 标记为提交中
with task_map_lock:
task["status"] = TASK_STATUS["SUBMITTING"]
task["cluster_id"] = cluster_id
logger.info(f"[{task['task_name']}] 开始提交至集群 {cluster_id}")
# 执行提交
submit_success = submit_single_task(task)
if not submit_success:
logger.warning(f"[{task['task_name']}] 提交失败,等待重试(当前失败次数:{task['fail_count']}")
def _check_all_tasks_completed(self) -> bool:
"""检查是否所有任务已完成"""
with task_map_lock:
for task in task_map.values():
if task["status"] in [TASK_STATUS["SUBMITTED"], TASK_STATUS["SUBMITTING"]]:
return False
if task["status"] == TASK_STATUS["FAILED"] and task["fail_count"] < task["max_fail_threshold"]:
return False
return True
def stop(self) -> None:
self._stop_event.set()
# -------------------------- 主程序 --------------------------
if __name__ == "__main__":
task_templates = generate_task_templates()
load_tasks_to_queue(task_templates)
monitor_thread = TaskMonitorThread(check_interval=10)
monitor_thread.start()
submit_thread = TaskSubmitThread(max_workers=5)
submit_thread.start()
monitor_thread.join()
submit_thread.join()
logger.info("所有任务处理完毕,程序退出")