From aa7a52fc0bebf4ec820a9647899c1bcd7229c694 Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Tue, 15 Jul 2025 19:49:15 +0800 Subject: [PATCH] ADD file via upload --- commit_tasks_0715.py | 684 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 684 insertions(+) create mode 100644 commit_tasks_0715.py diff --git a/commit_tasks_0715.py b/commit_tasks_0715.py new file mode 100644 index 0000000..6cc6760 --- /dev/null +++ b/commit_tasks_0715.py @@ -0,0 +1,684 @@ +import requests +import json +import concurrent.futures +import time +import logging +import os +import threading +from uuid import uuid4 +from typing import Dict, List, Optional +from datetime import datetime + + +# 日志配置 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler()] +) +logger = logging.getLogger(__name__) + +# 全局任务字典(key=target_id,value=任务详情) +task_map: Dict[str, Dict] = {} +# 线程锁 +task_map_lock = threading.Lock() + +# 集群资源 +cluster_resources: Dict[str, Dict] = { + # modelarts集群 (ID:1790300942428540928) + "1790300942428540928": { + "total": {"CPU": 96, "MEMORY": 1024, "NPU": 4}, + "available": {"CPU": 48, "MEMORY": 512, "NPU": 2} + }, + # openi(集群ID:1865927992266461184) + "1865927992266461184": { + "total": {"CPU": 48, "MEMORY": 512, "NPU": 1}, + "available": {"CPU": 24, "MEMORY": 256, "NPU": 1} + }, + # octopus-189(章鱼集群,ID:1865927992266462181) + "1865927992266462181": { + "total": {"CPU": 48, "MEMORY": 512, "NPU": 1}, + "available": {"CPU": 24, "MEMORY": 256, "NPU": 1} + }, + # shuguangAi(曙光集群,ID:1777240145309732864) + "1777240145309732864": { + "total": {"CPU": 48, "MEMORY": 512, "NPU": 1}, + "available": {"CPU": 24, "MEMORY": 256, "NPU": 1} + }, +} + +# 数据集 +class DatasetInfo(Dict): + id: str # 数据集唯一标识 + name: str # 数据集名称 + size: float # 数据集大小(字节) + status: str # 状态(如:"uploaded") + is_uploaded: bool # 是否已上传 + upload_time: Optional[datetime] # 上传时间(未上传时为None) + description: Optional[str] # 可选描述信息 + +# 算法 +class AlgorithmInfo(Dict): + id: str # 算法唯一标识 + name: str # 算法名称 + size: float # 数据集大小(字节) + status: str # 状态(如:"uploaded") + is_uploaded: bool # 是否已上传 + upload_time: Optional[datetime] # 上传时间(未上传时为None) + description: Optional[str] # 可选描述信息 + +# 集群数据 +class ClusterDatas(Dict): + datasets: Dict[str, DatasetInfo] # Key is dataset ID + algorithms: Dict[str, AlgorithmInfo] # Key is algorithm ID + + +from typing import Dict, Optional +from datetime import datetime + + +class TaskTemplate(Dict): + """集群任务模板数据结构(继承Dict,支持字典操作和类型注解)""" + # 任务核心标识 + target_id: str # 任务唯一标识 + task_name: str # 任务名称 + package_name: str # 任务包名称 + # 关联资源 + dataset_name: str # 关联数据集名称 + code_Id: str # 关联算法ID(原模板字段名保留) + # 资源配置(嵌套字典:CPU/MEMORY/NPU) + resource: Dict[str, int] # 资源配置字典,键为"CPU"/"MEMORY"/"NPU",值为整数 + # 任务状态与时间 + status: str # 任务状态(如:"submitted") + submit_time: str # 提交时间(字符串格式,如:"2025-07-15 19:42:03") + # 附加信息 + file_location: str # 任务文件存储路径 + error_msg: str # 错误信息(为空表示无错误) + retry_count: int # 当前重试次数 + max_retries: int # 最大重试次数 + + def __init__(self, **kwargs): + default_values = { + "target_id": "", + "task_name": "", + "package_name": "", + "dataset_name": "", + "code_Id": "", + "resource": {"CPU": 0, "MEMORY": 0, "NPU": 0}, # 资源默认值(整数类型) + "status": "submitted", # 初始状态:待提交 + "submit_time": "", + "file_location": "", + "error_msg": "", + "retry_count": 0, + "max_retries": 5 + } + # 调用父类Dict的初始化,确保支持字典操作(如task_template["status"]) + super().__init__(default_values) + + # 将字段绑定为实例属性,支持点语法访问(如task_template.status) + for key, value in default_values.items(): + setattr(self, key, value) + +class TaskMonitorThread(threading.Thread): + "任务监控线程:轮询任务状态,重置可重试任务" + def __init__(self, check_interval: int = 30, name: Optional[str] = None): + super().__init__(name=name or "TaskMonitorThread") + self.check_interval = check_interval # 轮询间隔(秒) + self._stop_event = threading.Event() + self.all_tasks_completed = threading.Event() # 通知所有任务完成的事件 + + def run(self) -> None: + logger.info(f"监控线程启动 | 轮询间隔: {self.check_interval}秒 | 线程ID: {self.ident}") + while not self._stop_event.is_set(): + with task_map_lock: + all_completed = True # 标记是否所有任务都已完成 + retry_tasks = [] # 可重试任务列表 + + # 遍历所有任务检查状态 + for task in task_map.values(): + status = task["status"] + # 忽略已完成状态(成功) + if status in ["succeed"]: + continue + # 非完成状态,检查是否可重试 + if status in ["failed", "error"]: + if task["retry_count"] < task["max_retries"]: + retry_tasks.append(task) # 加入重试列表 + all_completed = False + else: + # 达到最大重试次数,标记为重试耗尽 + task["status"] = "retry_exhausted" + logger.warning(f"任务 {task['task_name']} 达到最大重试次数({task['max_retries']}),停止重试") + all_completed = False + else: + # 其他状态(如submitted),未完成 + all_completed = False + + # 处理可重试任务(重置状态为submitted) + if retry_tasks: + logger.info(f"发现 {len(retry_tasks)} 个可重试任务,重置状态为submitted") + for task in retry_tasks: + task["status"] = "submitted" + task["retry_count"] += 1 + logger.info( + f"任务 {task['task_name']} (target_id: {task['target_id']}) | " + f"重试次数: {task['retry_count']}/{task['max_retries']} | 状态已重置" + ) + + # 所有任务进入最终状态,通知提交线程停止 + if all_completed: + logger.info("所有任务已进入最终状态(成功或重试耗尽)") + self.all_tasks_completed.set() + break + + # 等待下一次轮询 + self._stop_event.wait(self.check_interval) + logger.info(f"监控线程结束 | 线程ID: {self.ident}") + + def stop(self) -> None: + """停止监控线程""" + self._stop_event.set() + + +class TaskSubmitThread(threading.Thread): + """任务提交线程:循环提交待处理任务,响应监控线程信号""" + + def __init__(self, max_workers: int = 3, name: Optional[str] = None): + super().__init__(name=name or "TaskSubmitThread") + self.max_workers = max_workers # 并发提交数 + self.monitor_thread = monitor_thread # 关联监控线程,获取完成信号 + self._stop_event = threading.Event() + + def run(self) -> None: + logger.info(f"任务提交线程启动 | 并发数: {self.max_workers} | 线程ID: {self.ident}") + try: + # 循环提交任务,直到监控线程通知所有任务完成 + while not self._stop_event.is_set() and not self.monitor_thread.all_tasks_completed.is_set(): + # 1. 查询集群资源 + available_resources = search_resource() + if not available_resources: + logger.error("未获取到集群资源信息,10秒后重试...") + self._stop_event.wait(10) + continue + + # 2. 提交满足条件的任务 + commit_tasks(available_resources, self.max_workers) + + # 3. 等待下次检查(避免高频查询) + self._stop_event.wait(5) # 5秒后再次检查任务队列 + + except Exception as e: + logger.error(f"任务提交线程异常终止: {str(e)}", exc_info=True) + finally: + logger.info(f"任务提交线程结束 | 线程ID: {self.ident}") + + def stop(self) -> None: + """停止提交线程""" + self._stop_event.set() + + +def generate_tasks() -> List[Dict]: + """生成任务模板列表(包含差异化配置)""" + base_tasks = [ + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AA", + "dataset_name": "data1.zip", + "code_Id": 1, + "file_location": "D:/数据集/cnn数据集/data1/", + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AB", + "dataset_name": "cifar-10-python.tar.gz", + "file_location": "D:/数据集/cnn数据集/data2/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AC", + "dataset_name": "cifar-100-python.tar.gz", + "file_location": "D:/数据集/cnn数据集/data3/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AD", + "dataset_name": "cifar-100-python.tar.gz", + "file_location": "D:/数据集/cnn数据集/data3/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AE", + "dataset_name": "dev.jsonl", + "file_location": "D:/数据集/transfomer数据集/BoolQ/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AF", + "dataset_name": "ceval.zip", + "file_location": "D:/数据集/transfomer数据集/CEval/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AG", + "dataset_name": "CMMLU.zip", + "file_location": "D:/数据集/transfomer数据集/CMMLU/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AH", + "dataset_name": "mental_health.csv", + "file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AI", + "dataset_name": "GSM8K.jsonl", + "file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AJ", + "dataset_name": "human-eval.jsonl", + "file_location": "D:/数据集/transfomer数据集/HumanEval/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + }, + { + "task_name_template": "{prefix}-jointCloudAi-trainingtask", + "prefix": "AK", + "dataset_name": "HumanEval_X.zip", + "file_location": "D:/数据集/transfomer数据集/HumanEval_X/", + "code_Id": 1, + "CPU": 24, + "MEMORY": 256, + "NPU": 1 + } + ] + logger.info(f"成功生成 {len(base_tasks)} 个任务模板") + return base_tasks + + +from typing import List, Dict +import uuid +import time +import logging +from threading import Lock + +# 全局变量定义 +task_map = {} +task_map_lock = Lock() +# 定义任务模板结构(复用之前定义的) +task_template = { + "target_id": "", + "task_name": "", + "package_name": "", + "dataset_name": "", + "code_Id": "", + "resource": { + "CPU": 0, + "MEMORY": 0, + "NPU": 0 + }, + "status": "submitted", + "submit_time": "", + "file_location": "", + "error_msg": "", + "retry_count": 0, + "max_retries": 5 +} + + +def read_tasks(templates: List[Dict]) -> None: + """将任务模板转换为可提交的任务字典(新增重试相关字段)""" + global task_map, task_template + with task_map_lock: + task_map = {} # 清空历史任务 + if not templates: + logger.warning("任务模板为空,跳过任务创建") + return + + for template in templates: + try: + # 重置任务模板 + task = task_template.copy() + + target_id = str(uuid4()) # 生成唯一任务ID + submit_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + + # 设置任务属性 + task["target_id"] = target_id + task["task_name"] = template["task_name_template"].format(prefix=template["prefix"]) + task["package_name"] = f"{template['prefix'].lower()}-training-pkg" + task["dataset_name"] = template["dataset_name"] + task["code_Id"] = template["code_Id"] + task["resource"] = { + "CPU": template["CPU"], + "MEMORY": template["MEMORY"], + "NPU": template["NPU"] + } + task["status"] = "submitted" + task["submit_time"] = submit_time_str + task["file_location"] = template["file_location"] + + task_map[target_id] = task + logger.info(f"任务创建成功 | task_name: {task['task_name']} | target_id: {target_id}") + except KeyError as e: + logger.error(f"任务模板缺少字段: {e} | 模板内容: {template}") + except Exception as e: + logger.error(f"创建任务失败: {str(e)} | 模板内容: {template}") + + +def search_resource() -> Dict[str, Dict]: + """查询集群资源(返回全局集群资源字典,包含总资源和可用资源)""" + global cluster_resources + logger.info(f"查询到集群资源: {cluster_resources}") + return cluster_resources + + +def get_token() -> Optional[str]: + """获取认证Token""" + login_url = "http://119.45.255.234:30180/jcc-admin/admin/login" + login_payload = {"username": "admin", "password": "Nudt@123"} + try: + response = requests.post(login_url, json=login_payload, timeout=10) + response.raise_for_status() + result = response.json() + if result.get("code") == 200 and "data" in result and "token" in result["data"]: + logger.info("Token获取成功") + return result["data"]["token"] + else: + logger.error(f"Token获取失败 | 响应: {result}") + return None + except requests.exceptions.RequestException as e: + logger.error(f"登录请求异常: {str(e)}", exc_info=True) + return None + + +def submit_single_task(task: Dict) -> bool: + """提交单个任务到集群(失败时更新状态为failed/error)""" + token = get_token() + if not token: + with task_map_lock: + task["status"] = "failed" + task["error_msg"] = "获取Token失败" + return False + + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {token}' + } + task_name = task["task_name"] + package_name = task["package_name"] + file_name = task["dataset_name"] + file_location = task["file_location"] + son_code_Id = task["code_Id"] + file_path = os.path.join(file_location, file_name) + + try: + # 第一步:创建数据集文件夹 + create_url = "http://119.45.255.234:30180/jsm/jobSet/createPackage" + create_payload = { + "userID": 5, + "name": package_name, + "dataType": "dataset", + "packageID": 0, + "uploadPriority": {"type": "specify", "clusters": ["1790300942428540928"]}, + "bindingInfo": { + "clusterIDs": ["1790300942428540928"], + "name": package_name, + "category": "image", + "type": "dataset", + "imageID": "", + "bias": [], + "region": [], + "chip": ["ASCEND"], + "selectedCluster": [], + "modelType": "", + "env": "", + "version": "", + "packageID": 0, + "points": 0 + } + } + create_resp = requests.post(create_url, json=create_payload, headers=headers, timeout=15) + create_resp.raise_for_status() + create_result = create_resp.json() + if create_result.get("code") != 200: + raise ValueError(f"创建文件夹失败 | API返回: {create_result}") + packageID = create_result["data"]["newPackage"]["packageID"] + logger.info(f"[{task_name}] 第一步:创建文件夹成功 | packageID: {packageID}") + + # 第三步:上传数据集文件 + upload_url = "http://119.45.255.234:30180/jcs/object/upload" + if not os.path.exists(file_path): + raise FileNotFoundError(f"数据集文件不存在 | path: {file_path}") + + info_data = { + "userID": 5, + "packageID": packageID, + "loadTo": [3], + "loadToPath": [f"/dataset/5/{package_name}/"] + } + file_headers = {'Authorization': f'Bearer {token}'} + with open(file_path, 'rb') as f: + form_data = {"info": (None, json.dumps(info_data)), "files": f} + upload_resp = requests.post(upload_url, files=form_data, headers=file_headers, timeout=300) + upload_resp.raise_for_status() + upload_result = upload_resp.json() + if upload_result.get("code") != 200: + raise ValueError(f"文件上传失败 | API返回: {upload_result}") + object_id = upload_result["data"]["uploadeds"][0]["objectID"] + logger.info(f"[{task_name}] 第三步:文件上传成功 | objectID: {object_id}") + + # 第四步:通知上传完成 + notify_url = "http://119.45.255.234:30180/jsm/jobSet/notifyUploaded" + notify_payload = { + "userID": 5, + "packageID": packageID, + "uploadParams": { + "dataType": "dataset", + "uploadInfo": {"type": "local", "localPath": file_name, "objectIDs": [object_id]} + } + } + notify_resp = requests.post(notify_url, json=notify_payload, headers=headers, timeout=15) + notify_resp.raise_for_status() + notify_result = notify_resp.json() + if notify_result.get("code") != 200: + raise ValueError(f"通知上传完成失败 | API返回: {notify_result}") + logger.info(f"[{task_name}] 第四步:通知上传完成成功") + + # 第七步:绑定数据集到集群 + bind_url = "http://119.45.255.234:30180/jsm/jobSet/binding" + bind_payload = { + "userID": 5, + "info": {"type": "dataset", "packageID": packageID, "clusterIDs": ["1790300942428540928"]} + } + bind_resp = requests.post(bind_url, json=bind_payload, headers=headers, timeout=15) + bind_resp.raise_for_status() + bind_result = bind_resp.json() + if bind_result.get("code") != 200: + raise ValueError(f"绑定集群失败 | API返回: {bind_result}") + logger.info(f"[{task_name}] 第七步:数据集绑定集群成功") + + # 第八步:查询绑定ID + query_bind_url = "http://119.45.255.234:30180/jsm/jobSet/queryBinding" + query_bind_payload = { + "dataType": "dataset", + "param": {"userID": 5, "bindingID": -1, "type": "private"} + } + query_bind_resp = requests.post(query_bind_url, json=query_bind_payload, headers=headers, timeout=15).json() + if query_bind_resp.get("code") != 200: + raise ValueError(f"查询绑定失败 | API返回: {query_bind_resp}") + + # 提取目标绑定ID + target_id = None + for data in query_bind_resp["data"]["datas"]: + if data["info"]["name"] == package_name: + target_id = data["ID"] + break + if not target_id: + raise ValueError(f"未找到package_name={package_name}的绑定ID") + logger.info(f"[{task_name}] 第八步:获取绑定ID成功 | target_id: {target_id}") + + # 第九步:提交训练任务 + submit_url = "http://119.45.255.234:30180/jsm/jobSet/submit" + task_res = task["resource"] + submit_payload = { + "userID": 5, + "jobSetInfo": { + "jobs": [ + { + "localJobID": "1", + "name": task_name, + "description": "自动提交的CNN训练任务", + "type": "AI", + "files": { + "dataset": {"type": "Binding", "bindingID": target_id}, + "model": {"type": "Binding", "bindingID": 421}, + "image": {"type": "Image", "imageID": 11} + }, + "jobResources": { + "scheduleStrategy": "dataLocality", + "clusters": [ + { + "clusterID": "1790300942428540928", + "runtime": {"envs": {}, "params": {}}, + "code": {"type": "Binding", "bindingID": son_code_Id}, + "resources": [ + {"type": "CPU", "name": "ARM", "number": task_res["CPU"]}, + {"type": "MEMORY", "name": "RAM", "number": task_res["MEMORY"]}, + {"type": "MEMORY", "name": "VRAM", "number": 32}, + {"type": "STORAGE", "name": "DISK", "number": 32}, + {"type": "NPU", "name": "ASCEND910", "number": task_res["NPU"]} + ] + } + ] + } + }, + {"localJobID": "4", "type": "DataReturn", "targetLocalJobID": "1"} + ] + } + } + submit_resp = requests.post(submit_url, json=submit_payload, headers=headers, timeout=15).json() + if submit_resp.get("code") != 200: + raise ValueError(f"任务提交失败 | API返回: {submit_resp}") + logger.info(f"[{task_name}] 第九步:任务提交成功 | 任务ID: {submit_resp.get('data', {}).get('jobSetID')}") + + # 更新任务状态为成功(线程安全) + with task_map_lock: + task["status"] = "succeed" + return True + + except Exception as e: + error_msg = f"提交失败: {str(e)}" + with task_map_lock: + # 检查是否达到最大重试次数 + if task["retry_count"] >= task["max_retries"]: + task["status"] = "retry_exhausted" + else: + task["status"] = "failed" # 未达最大次数,标记为failed等待重试 + task["error_msg"] = error_msg + logger.error(f"[{task_name}] {error_msg}", exc_info=True) + return False + + +def commit_tasks(available_resources: Dict[str, int], max_workers: int = 3) -> None: + """提交满足资源条件的任务(并发处理)""" + global task_map + with task_map_lock: + if not task_map: + logger.warning("无待提交任务,退出提交流程") + return + current_tasks = list(task_map.values()) # 复制当前任务列表,避免线程执行中被修改 + + # 筛选可提交任务(状态为submitted且资源满足) + eligible_tasks = [] + for task in current_tasks: + if task["status"] != "submitted": + logger.info(f"任务 {task['task_name']} 状态为 {task['status']},跳过提交") + continue + task_res = task["resource"] + # 检查CPU、内存、NPU是否均满足 + if (task_res["CPU"] <= available_resources["CPU"] and + task_res["MEMORY"] <= available_resources["MEMORY"] and + task_res["NPU"] <= available_resources["NPU"]): + eligible_tasks.append(task) + logger.info(f"任务 {task['task_name']} 资源满足,加入提交队列") + else: + logger.warning(f"任务 {task['task_name']} 资源不足 | 需求: {task_res} | 可用: {available_resources}") + + if not eligible_tasks: + logger.info("无满足资源条件的任务,提交流程结束") + return + + # 并发提交任务 + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {executor.submit(submit_single_task, task): task for task in eligible_tasks} + for future in concurrent.futures.as_completed(futures): + task = futures[future] + try: + future.result() # 触发可能的异常 + except Exception as e: + with task_map_lock: + task["status"] = "error" + task["error_msg"] = f"执行异常: {str(e)}" + logger.error(f"任务 {task['task_name']} 执行异常: {str(e)}", exc_info=True) + + +if __name__ == "__main__": + # 1. 任务静态数据信息 + task_templates = generate_tasks() + + # 2. 读取任务数据信息,队列列表(存储到全局task_map,包含重试字段) + read_tasks(task_templates) + + # 3. 创建监控任务状态线程(轮询间隔30秒,可调整) + monitor_thread = TaskMonitorThread(check_interval=30) + monitor_thread.start() + + # 4. 创建任务提交线程(关联监控线程,并发数30) + submit_thread = TaskSubmitThread(max_workers=30, monitor_thread=monitor_thread) + submit_thread.start() + + # 5. 等待监控线程完成 + monitor_thread.join() + submit_thread.join() + +