From 82d175f637d7c04969538dfca46393f1c65cf277 Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Thu, 17 Jul 2025 08:58:17 +0800 Subject: [PATCH] ADD file via upload --- commit_task_jointcloud_0716.py | 799 +++++++++++++++++++++++++++++++++ 1 file changed, 799 insertions(+) create mode 100644 commit_task_jointcloud_0716.py diff --git a/commit_task_jointcloud_0716.py b/commit_task_jointcloud_0716.py new file mode 100644 index 0000000..35e3da5 --- /dev/null +++ b/commit_task_jointcloud_0716.py @@ -0,0 +1,799 @@ +import concurrent.futures +import time +import logging +import threading +from uuid import uuid4 +from typing import Dict, List, Optional +import requests +import os +import json + +# -------------------------- 全局配置与常量定义 -------------------------- +# 日志配置 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler()] +) +logger = logging.getLogger(__name__) + +# 任务状态定义 +TASK_STATUS = { + "SUBMITTED": "待提交", # 初始状态 + "SUBMITTING": "提交中", # 提交过程中 + "SUCCEED": "提交成功", # 第三方确认成功 + "FAILED": "提交失败" # 第三方确认失败 +} + +# 全局任务字典(key=target_id,value=任务详情) +task_map: Dict[str, Dict] = {} +task_map_lock = threading.Lock() # 任务字典线程锁 + +# API配置(新增任务详情查询接口) +API_CONFIG = { + "login": { + "url": "http://119.45.255.234:30180/jcc-admin/admin/login", + "timeout": 10 + }, + "create_package": { + "url": "http://119.45.255.234:30180/jsm/jobSet/createPackage", + "timeout": 15 + }, + "upload_file": { + "url": "http://119.45.255.234:30180/jcs/object/upload", + "timeout": 300 + }, + "notify_upload": { + "url": "http://119.45.255.234:30180/jsm/jobSet/notifyUploaded", + "timeout": 15 + }, + "bind_cluster": { + "url": "http://119.45.255.234:30180/jsm/jobSet/binding", + "timeout": 15 + }, + "query_binding": { + "url": "http://119.45.255.234:30180/jsm/jobSet/queryBinding", + "timeout": 15 + }, + "submit_task": { + "url": "http://119.45.255.234:30180/jsm/jobSet/submit", + "timeout": 15 + }, + "task_details": { # 新增任务详情查询接口配置 + "url": "http://119.45.255.234:30180/pcm/v1/core/task/details", + "timeout": 15 + } +} + + +# 集群资源配置(key=集群ID,value=总资源/可用资源) +cluster_resources: Dict[str, Dict] = { + "1790300942428540928": { # modelarts集群 + "total": {"CPU": 96, "MEMORY": 1024, "NPU": 2}, + "available": {"CPU": 48, "MEMORY": 512, "NPU": 1} + }, + "1865927992266461184": { # openi集群 + "total": {"CPU": 48, "MEMORY": 512, "DCU": 1}, + "available": {"CPU": 24, "MEMORY": 256, "DCU": 1} + }, + "1865927992266462181": { # 章鱼集群 + "total": {"CPU": 48, "MEMORY": 512, "DCU": 1}, + "available": {"CPU": 24, "MEMORY": 256, "DCU": 1} + }, + "1777240145309732864": { # 曙光集群 + "total": {"CPU": 48, "MEMORY": 512, "NPU": 1}, + "available": {"CPU": 24, "MEMORY": 256, "NPU": 1} + }, +} +cluster_lock = threading.Lock() # 集群资源线程锁 + + +# -------------------------- 数据结构定义 -------------------------- +class DatasetInfo(dict): + """数据集信息结构""" + + def __init__(self, file_location: str, name: str, size: float, **kwargs): + super().__init__() + self["file_location"] = file_location # 本地路径(主键) + self["id"] = kwargs.get("id", str(uuid4())) # 数据集唯一标识 + self["name"] = name # 数据集名称 + self["size"] = size # 大小(字节) + self["is_uploaded"] = kwargs.get("is_uploaded", False) # 是否已上传 + self["upload_cluster"] = kwargs.get("upload_cluster", []) # 上传的集群 + self["upload_time"] = kwargs.get("upload_time") # 上传时间 + self["description"] = kwargs.get("description") # 描述 + + +class AlgorithmInfo(dict): + """算法信息结构""" + + def __init__(self, cluster: str, id: str, name: str, **kwargs): + super().__init__() + self["cluster"] = cluster # 所属集群 + self["id"] = id # 算法唯一标识 + self["son_id"] = kwargs.get("son_id", "") # 子算法ID + self["name"] = name # 算法名称 + + +class TaskInfo(dict): + """任务信息结构(新增success_time字段记录成功时间)""" + + def __init__(self, task_name: str, dataset_name: str, code_id: str, resource: Dict, **kwargs): + super().__init__() + self["target_id"] = kwargs.get("target_id", str(uuid4())) # 任务唯一ID + self["task_name"] = task_name # 任务名称 + self["package_name"] = kwargs.get("package_name", f"{task_name.lower()}-pkg") # 文件夹名称 + self["dataset_name"] = dataset_name # 关联数据集名称 + self["code_id"] = code_id # 算法ID + self["son_code_id"] = "" # 子算法ID(提交时填充) + self["resource"] = resource # 资源需求(CPU/MEMORY/NPU等) + self["status"] = TASK_STATUS["SUBMITTED"] # 初始状态:待提交 + self["submit_time"] = kwargs.get("submit_time", time.strftime("%Y-%m-%d %H:%M:%S")) # 提交时间 + self["success_time"] = None # 成功时间(成功时填充) + self["third_party_task_id"] = "" # 第三方任务ID(提交后填充) + self["file_location"] = kwargs.get("file_location", "") # 本地文件路径 + self["error_msg"] = "" # 错误信息 + self["fail_count"] = 0 # 失败次数(原retry_count改为fail_count,更贴合语义) + self["max_fail_threshold"] = kwargs.get("max_fail_threshold", 3) # 最大失败阈值 + self["cluster_id"] = "" # 提交的集群ID(提交时填充) + + +# -------------------------- 工具方法 -------------------------- +def generate_task_templates() -> List[Dict]: + """生成任务静态数据模板""" + return [ + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AA", + "dataset_name": "data1.zip", + "code_id": "1164", + "file_location": "D:/数据集/cnn数据集/data1/", + "resource": {"CPU": 24, "MEMORY": 256, "NPU": 1} + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AB", + "dataset_name": "cifar-10-python.tar.gz", + "code_id": "1", + "file_location": "D:/数据集/cnn数据集/data2/", + "resource": {"CPU": 24, "MEMORY": 256, "NPU": 1} + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AC", + "dataset_name": "cifar-100-python.tar.gz", + "code_id": "1", + "file_location": "D:/数据集/cnn数据集/data3/", + "resource": {"CPU": 24, "MEMORY": 256, "NPU": 1} + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AD", + "dataset_name": "dev.jsonl", + "code_id": "2", + "file_location": "D:/数据集/transfomer数据集/BoolQ/", + "resource": {"CPU": 24, "MEMORY": 256, "NPU": 1} + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AE", + "dataset_name": "dev.jsonl", + "file_location": "D:/数据集/transfomer数据集/BoolQ/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AF", + "dataset_name": "ceval.zip", + "file_location": "D:/数据集/transfomer数据集/CEval/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AG", + "dataset_name": "CMMLU.zip", + "file_location": "D:/数据集/transfomer数据集/CMMLU/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AH", + "dataset_name": "mental_health.csv", + "file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AI", + "dataset_name": "GSM8K.jsonl", + "file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AJ", + "dataset_name": "human-eval.jsonl", + "file_location": "D:/数据集/transfomer数据集/HumanEval/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AK", + "dataset_name": "HumanEval_X.zip", + "file_location": "D:/数据集/transfomer数据集/HumanEval_X/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AF", + "dataset_name": "ceval.zip", + "file_location": "D:/数据集/transfomer数据集/CEval/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AG", + "dataset_name": "CMMLU.zip", + "file_location": "D:/数据集/transfomer数据集/CMMLU/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AH", + "dataset_name": "mental_health.csv", + "file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AI", + "dataset_name": "GSM8K.jsonl", + "file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AJ", + "dataset_name": "human-eval.jsonl", + "file_location": "D:/数据集/transfomer数据集/HumanEval/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AK", + "dataset_name": "HumanEval_X.zip", + "file_location": "D:/数据集/transfomer数据集/HumanEval_X/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + } + ] + + +def load_tasks_to_queue(templates: List[Dict]) -> None: + """将任务静态数据加载到任务队列(task_map)""" + global task_map + with task_map_lock: + task_map.clear() + for template in templates: + task_name = template["task_name_template"].format(prefix=template["prefix"]) + task = TaskInfo( + task_name=task_name, + dataset_name=template["dataset_name"], + code_id=template["code_id"], + resource=template["resource"], + file_location=template["file_location"] + ) + task_map[task["target_id"]] = task + logger.info(f"任务入队 | task_name: {task_name} | target_id: {task['target_id']}") + logger.info(f"任务队列加载完成 | 共 {len(task_map)} 个任务") + + +def select_cluster(task_resource: Dict) -> Optional[str]: + """根据任务资源需求选择合适的集群""" + with cluster_lock: + for cluster_id, cluster in cluster_resources.items(): + # 检查集群可用资源是否满足任务需求(支持NPU/DCU等不同加速卡类型) + resource_match = True + for res_type, required in task_resource.items(): + # 集群可用资源中可能是NPU或DCU,统一检查 + available = cluster["available"].get(res_type, 0) + if available < required: + resource_match = False + break + if resource_match: + return cluster_id + logger.warning(f"无满足资源需求的集群 | 任务需求: {task_resource}") + return None + +# -------------------------- API调用方法 -------------------------- + +def get_son_code_id(cluster_id: str, code_id: str) -> str: + """根据集群ID和算法ID查询子算法ID(模拟接口查询)""" + son_code_map = { + ("1790300942428540928", "1"): "1-1", + ("1790300942428540928", "2"): "2-1", + ("1777240145309732864", "1"): "1-2", + ("1865927992266461184", "2"): "2-2" + } + return son_code_map.get((cluster_id, code_id), f"{code_id}-default") + + +#def get_auth_token() -> Optional[str]: + # -------------------------- API调用方法 -------------------------- +def get_token() -> Optional[str]: + """获取认证Token""" + login_payload = {"username": "admin", "password": "Nudt@123"} + try: + config = API_CONFIG["login"] + response = requests.post(config["url"], json=login_payload, timeout=config["timeout"]) + response.raise_for_status() + result = response.json() + if result.get("code") == 200 and "data" in result and "token" in result["data"]: + logger.info("Token获取成功") + return result["data"]["token"] + else: + logger.error(f"Token获取失败 | 响应: {result}") + return None + except requests.exceptions.RequestException as e: + logger.error(f"登录请求异常: {str(e)}", exc_info=True) + return None + + +def submit_single_task(task: Dict) -> bool: + """提交单个任务到集群(失败时更新状态为failed/error)""" + token = get_token() + if not token: + with task_map_lock: + task["status"] = TASK_STATUS["FAILED"] + task["error_msg"] = "获取Token失败" + return False + + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {token}' + } + task_name = task["task_name"] + package_name = task["package_name"] + file_name = task["dataset_name"] + file_location = task["file_location"] + code_id = task["code_id"] # 修正字段名 + son_code_id = get_son_code_id(task["cluster_id"], code_id) # 使用实际集群ID + file_path = os.path.join(file_location, file_name) + + try: + # 第一步:创建数据集文件夹 + config = API_CONFIG["create_package"] + create_payload = { + "userID": 5, + "name": package_name, + "dataType": "dataset", + "packageID": 0, + "uploadPriority": {"type": "specify", "clusters": ["1790300942428540928"]}, + "bindingInfo": { + "clusterIDs": ["1790300942428540928"], + "name": package_name, + "category": "image", + "type": "dataset", + "imageID": "", + "bias": [], + "region": [], + "chip": ["ASCEND"], + "selectedCluster": [], + "modelType": "", + "env": "", + "version": "", + "packageID": 0, + "points": 0 + } + } + create_resp = requests.post(config["url"], json=create_payload, headers=headers, timeout=config["timeout"]) + create_resp.raise_for_status() + create_result = create_resp.json() + if create_result.get("code") != 200: + raise ValueError(f"创建文件夹失败 | API返回: {create_result}") + packageID = create_result["data"]["newPackage"]["packageID"] + logger.info(f"[{task_name}] 第一步:创建文件夹成功 | packageID: {packageID}") + + # 第三步:上传数据集文件 + config = API_CONFIG["upload_file"] + if not os.path.exists(file_path): + raise FileNotFoundError(f"数据集文件不存在 | path: {file_path}") + + info_data = { + "userID": 5, + "packageID": packageID, + "loadTo": [3], + "loadToPath": [f"/dataset/5/{package_name}/"] + } + file_headers = {'Authorization': f'Bearer {token}'} + with open(file_path, 'rb') as f: + form_data = {"info": (None, json.dumps(info_data)), "files": f} + upload_resp = requests.post(config["url"], files=form_data, headers=file_headers, timeout=config["timeout"]) + upload_resp.raise_for_status() + upload_result = upload_resp.json() + if upload_result.get("code") != 200: + raise ValueError(f"文件上传失败 | API返回: {upload_result}") + object_id = upload_result["data"]["uploadeds"][0]["objectID"] + logger.info(f"[{task_name}] 第三步:文件上传成功 | objectID: {object_id}") + + # 第四步:通知上传完成 + config = API_CONFIG["notify_upload"] + notify_payload = { + "userID": 5, + "packageID": packageID, + "uploadParams": { + "dataType": "dataset", + "uploadInfo": {"type": "local", "localPath": file_name, "objectIDs": [object_id]} + } + } + notify_resp = requests.post(config["url"], json=notify_payload, headers=headers, timeout=config["timeout"]) + notify_resp.raise_for_status() + notify_result = notify_resp.json() + if notify_result.get("code") != 200: + raise ValueError(f"通知上传完成失败 | API返回: {notify_result}") + logger.info(f"[{task_name}] 第四步:通知上传完成成功") + + # 第七步:绑定数据集到集群 + config = API_CONFIG["bind_cluster"] + bind_payload = { + "userID": 5, + "info": {"type": "dataset", "packageID": packageID, "clusterIDs": ["1790300942428540928"]} + } + bind_resp = requests.post(config["url"], json=bind_payload, headers=headers, timeout=config["timeout"]) + bind_resp.raise_for_status() + bind_result = bind_resp.json() + if bind_result.get("code") != 200: + raise ValueError(f"绑定集群失败 | API返回: {bind_result}") + logger.info(f"[{task_name}] 第七步:数据集绑定集群成功") + + # 第八步:查询绑定ID + config = API_CONFIG["query_binding"] + query_bind_payload = { + "dataType": "dataset", + "param": {"userID": 5, "bindingID": -1, "type": "private"} + } + query_bind_resp = requests.post(config["url"], json=query_bind_payload, headers=headers, timeout=config["timeout"]).json() + if query_bind_resp.get("code") != 200: + raise ValueError(f"查询绑定失败 | API返回: {query_bind_resp}") + + # 提取目标绑定ID + target_id = None + for data in query_bind_resp["data"]["datas"]: + if data["info"]["name"] == package_name: + target_id = data["ID"] + break + if not target_id: + raise ValueError(f"未找到package_name={package_name}的绑定ID") + logger.info(f"[{task_name}] 第八步:获取绑定ID成功 | target_id: {target_id}") + + # 第九步:提交训练任务 + config = API_CONFIG["submit_task"] + task_res = task["resource"] + submit_payload = { + "userID": 5, + "jobSetInfo": { + "jobs": [ + { + "localJobID": "1", + "name": task_name, + "description": "自动提交的CNN训练任务", + "type": "AI", + "files": { + "dataset": {"type": "Binding", "bindingID": target_id}, + "model": {"type": "Binding", "bindingID": 421}, + "image": {"type": "Image", "imageID": 11} + }, + "jobResources": { + "scheduleStrategy": "dataLocality", + "clusters": [ + { + "clusterID": "1790300942428540928", + "runtime": {"envs": {}, "params": {}}, + "code": {"type": "Binding", "bindingID": son_code_id}, + "resources": [ + {"type": "CPU", "name": "ARM", "number": task_res["CPU"]}, + {"type": "MEMORY", "name": "RAM", "number": task_res["MEMORY"]}, + {"type": "MEMORY", "name": "VRAM", "number": 32}, + {"type": "STORAGE", "name": "DISK", "number": 32}, + {"type": "NPU", "name": "ASCEND910", "number": task_res.get("NPU", 0)} + ] + } + ] + } + }, + {"localJobID": "4", "type": "DataReturn", "targetLocalJobID": "1"} + ] + } + } + submit_resp = requests.post(config["url"], json=submit_payload, headers=headers, timeout=config["timeout"]).json() + if submit_resp.get("code") != 200: + raise ValueError(f"任务提交失败 | API返回: {submit_resp}") + + third_party_task_id = submit_resp.get('data', {}).get('jobSetID') + logger.info(f"[{task_name}] 第九步:任务提交成功 | 第三方任务ID: {third_party_task_id}") + + # 更新任务状态为成功(线程安全) + with task_map_lock: + task["status"] = TASK_STATUS["SUCCEED"] + task["third_party_task_id"] = third_party_task_id # 保存第三方任务ID + return True + + except Exception as e: + error_msg = f"提交失败: {str(e)}" + with task_map_lock: + # 检查是否达到最大重试次数 + task["fail_count"] += 1 + if task["fail_count"] >= task["max_fail_threshold"]: + task["status"] = TASK_STATUS["RETRY_EXHAUSTED"] + else: + task["status"] = TASK_STATUS["FAILED"] # 未达最大次数,标记为failed等待重试 + task["error_msg"] = error_msg + logger.error(f"[{task_name}] {error_msg}", exc_info=True) + return False + + + +def query_third_party_task_status(third_party_task_id: str) -> Optional[str]: + """ + 查询云际平台任务状态(实际API调用) + 返回subTaskInfos[]中第一个元素的status值 + """ + if not third_party_task_id: + logger.warning("第三方任务ID为空,无法查询状态") + return None + + try: + # 构建请求参数(ID作为查询参数) + config = API_CONFIG["task_details"] + params = {"id": third_party_task_id} + + # 发送请求(注意:任务详情接口可能需要Token认证,此处补充认证逻辑) + token = get_token() + headers = {"Authorization": f"Bearer {token}"} if token else {} + + response = requests.get( + config["url"], + params=params, + headers=headers, + timeout=config["timeout"] + ) + response.raise_for_status() # 抛出HTTP错误状态码 + result = response.json() + + # 解析响应结果 + if result.get("code") != 200: + logger.error(f"查询任务状态失败 | 任务ID: {third_party_task_id} | 响应: {result}") + return None + + # 提取subTaskInfos中的status + sub_task_infos = result.get("data", {}).get("subTaskInfos", []) + if not sub_task_infos: + logger.warning(f"任务 {third_party_task_id} 未找到subTaskInfos数据") + return None + + # 返回第一个子任务的status + return sub_task_infos[0].get("status") + + except requests.exceptions.RequestException as e: + logger.error(f"查询任务状态请求异常 | 任务ID: {third_party_task_id} | 错误: {str(e)}", exc_info=True) + return None + except (KeyError, IndexError) as e: + logger.error(f"解析任务状态响应失败 | 任务ID: {third_party_task_id} | 错误: {str(e)}", exc_info=True) + return None + + +# -------------------------- 线程一:任务监控线程 -------------------------- +class TaskMonitorThread(threading.Thread): + """监控线程:专注监控任务状态,仅处理提交中任务的状态更新""" + + def __init__(self, check_interval: int = 10): + super().__init__(name="TaskMonitorThread") + self.check_interval = check_interval # 监控间隔(秒) + self._stop_event = threading.Event() + + def run(self) -> None: + logger.info(f"监控线程启动 | 监控间隔: {self.check_interval}秒") + while not self._stop_event.is_set(): + with task_map_lock: + tasks = list(task_map.values()) # 复制任务列表,避免线程安全问题 + + for task in tasks: + with task_map_lock: + current_status = task["status"] + + # 1. 待提交状态:不处理 + if current_status == TASK_STATUS["SUBMITTED"]: + continue + + # 2. 提交中状态:定时查询第三方状态并更新 + elif current_status == TASK_STATUS["SUBMITTING"]: + if not task["third_party_task_id"]: + logger.warning(f"任务 {task['task_name']} 无第三方ID,跳过状态查询") + continue + + # 查询第三方状态 + third_status = query_third_party_task_status(task["third_party_task_id"])# 根据第三方返回的id查询任务状态 + with task_map_lock: + # 2.1 第三方状态为成功:更新为提交成功,记录成功时间 + if third_status == "SUCCEEDED": + task["status"] = TASK_STATUS["SUCCEED"] + task["success_time"] = time.strftime("%Y-%m-%d %H:%M:%S") + logger.info( + f"任务状态更新 | task_name: {task['task_name']} | 提交成功 | 成功时间: {task['success_time']}") + # 2.2 第三方状态为失败:更新为提交失败,失败次数+1 + elif third_status == "FAILED": + task["status"] = TASK_STATUS["FAILED"] + task["fail_count"] += 1 + task["error_msg"] = f"第三方任务执行失败(第{task['fail_count']}次)" + logger.warning( + f"任务状态更新 | task_name: {task['task_name']} | 提交失败 | 失败次数: {task['fail_count']}/{task['max_fail_threshold']}") + # 2.3 第三方状态为提交中:不更新状态 + + # 3. 提交成功状态:不处理 + elif current_status == TASK_STATUS["SUCCEED"]: + continue + + # 4. 提交失败状态:不处理(由提交线程判断是否重试) + elif current_status == TASK_STATUS["FAILED"]: + continue + + # 检查是否所有任务已完成(成功或失败次数超阈值) + all_completed = self._check_all_tasks_completed() + if all_completed: + logger.info("所有任务已完成(成功或失败次数超过阈值)") + self.stop() + + # 等待下次监控 + self._stop_event.wait(self.check_interval) + logger.info("监控线程结束") + + def _check_all_tasks_completed(self) -> bool: + """检查是否所有任务已完成(成功或失败次数超阈值)""" + with task_map_lock: + for task in task_map.values(): + # 待提交或提交中:未完成 + if task["status"] in [TASK_STATUS["SUBMITTED"], TASK_STATUS["SUBMITTING"]]: + return False + # 提交失败但次数未超阈值:未完成(可能被提交线程重试) + if task["status"] == TASK_STATUS["FAILED"] and task["fail_count"] < task["max_fail_threshold"]: + return False + return True + + def stop(self) -> None: + self._stop_event.set() + + +# -------------------------- 线程二:任务提交线程 -------------------------- +class TaskSubmitThread(threading.Thread): + """提交线程:按状态判断是否提交,处理待提交和未超阈值的失败任务""" + + def __init__(self, max_workers: int = 3): + super().__init__(name="TaskSubmitThread") + self.max_workers = max_workers # 并发提交数 + self._stop_event = threading.Event() + + def run(self) -> None: + logger.info(f"提交线程启动 | 并发数: {self.max_workers}") + while not self._stop_event.is_set(): + # 1. 筛选符合条件的任务:待提交 或 失败次数未超阈值的提交失败任务 + with task_map_lock: + pending_tasks = [] + for task in task_map.values(): + status = task["status"] + # 1.1 待提交状态:直接提交 + if status == TASK_STATUS["SUBMITTED"]: + pending_tasks.append(task) + # 1.2 提交失败状态:检查失败次数,未超阈值则提交 + elif status == TASK_STATUS["FAILED"]: + if task["fail_count"] < task["max_fail_threshold"]: + pending_tasks.append(task) + else: + logger.info( + f"任务 {task['task_name']} 失败次数超阈值({task['max_fail_threshold']}),停止提交") + + # if not pending_tasks: + # logger.info("无待提交任务,等待下次检查") + # self._stop_event.wait(5) + # continue + + # 2. 并发提交任务 + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + futures = {executor.submit(self.commit_task, task): task for task in pending_tasks} + for future in concurrent.futures.as_completed(futures): + task = futures[future] + try: + future.result() + except Exception as e: + with task_map_lock: + task["status"] = TASK_STATUS["FAILED"] + task["fail_count"] += 1 + task["error_msg"] = f"提交过程异常: {str(e)}" + logger.error(f"任务提交异常 | task_name: {task['task_name']} | 错误: {str(e)}") + + logger.info("提交线程结束") + + def commit_task(self, task: Dict) -> None: + """提交任务的入口:先选集群,再调用submit_single_task""" + # 1. 选择集群并更新任务 + cluster_id = select_cluster(task["resource"]) + if not cluster_id: + with task_map_lock: + task["status"] = TASK_STATUS["FAILED"] + task["fail_count"] += 1 + task["error_msg"] = "无可用集群" + logger.error(f"[{task['task_name']}] 提交失败:无可用集群") + return + + # 2. 标记任务为提交中 + with task_map_lock: + task["status"] = TASK_STATUS["SUBMITTING"] + task["cluster_id"] = cluster_id # 记录集群ID + logger.info(f"[{task['task_name']}] 开始提交至集群 {cluster_id}") + + # 3. 调用核心提交方法 + submit_success = submit_single_task(task) + if not submit_success: + logger.warning(f"[{task['task_name']}] 提交失败,等待重试(当前失败次数:{task['fail_count']})") + + # def stop(self) -> None: + # self._stop_event.set() + + +# -------------------------- 主程序 -------------------------- +if __name__ == "__main__": + # 1. 生成任务静态数据 + task_templates = generate_task_templates() + + # 2. 读取任务进入队列 + load_tasks_to_queue(task_templates) + + # 3. 启动监控线程 + monitor_thread = TaskMonitorThread(check_interval=10) + monitor_thread.start() + + # 4. 启动提交线程 + submit_thread = TaskSubmitThread(max_workers=3) + submit_thread.start() + + # 5. 等待线程结束 + monitor_thread.join() + submit_thread.join() + logger.info("所有任务处理完毕,程序退出") \ No newline at end of file