From e33ce289e27436d853896fea0ce2cbe92c539094 Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Mon, 21 Jul 2025 18:58:38 +0800 Subject: [PATCH] Delete commit_tasks.py --- commit_tasks.py | 467 ------------------------------------------------ 1 file changed, 467 deletions(-) delete mode 100644 commit_tasks.py diff --git a/commit_tasks.py b/commit_tasks.py deleted file mode 100644 index 265a4a9..0000000 --- a/commit_tasks.py +++ /dev/null @@ -1,467 +0,0 @@ -import requests -import json -import concurrent.futures -import time -import queue -import logging -import os -from uuid import uuid4 -from typing import Dict, List, Optional - -# 日志配置 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[logging.StreamHandler()] -) -logger = logging.getLogger(__name__) - -# 全局任务列表(用于跟踪所有任务状态) -task_list: List[Dict] = [] - - -def generate_tasks() -> List[Dict]: - """生成任务模板列表(包含差异化配置)""" - base_tasks = [ - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AA", - "dataset_name": "data1.zip", - "code_Id": 1, - "file_location": "D:/数据集/cnn数据集/data1/", - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }, - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AB", - "dataset_name": "cifar-10-python.tar.gz", - "file_location": "D:/数据集/cnn数据集/data2/", - "code_Id": 1, - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }, - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AC", - "dataset_name": "cifar-100-python.tar.gz", - "file_location": "D:/数据集/cnn数据集/data3/", - "code_Id": 1, - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }, - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AD", - "dataset_name": "cifar-100-python.tar.gz", - "file_location": "D:/数据集/cnn数据集/data3/", - "code_Id": 1, - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }, - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AE", - "dataset_name": "dev.jsonl", - "file_location": "D:/数据集/transfomer数据集/BoolQ/", - "code_Id": 1, - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }, - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AF", - "dataset_name": "ceval.zip", - "file_location": "D:/数据集/transfomer数据集/CEval/", - "code_Id": 1, - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }, - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AG", - "dataset_name": "CMMLU.zip", - "file_location": "D:/数据集/transfomer数据集/CMMLU/", - "code_Id": 1, - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }, - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AH", - "dataset_name": "mental_health.csv", - "file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/", - "code_Id": 1, - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }, - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AI", - "dataset_name": "GSM8K.jsonl", - "file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/", - "code_Id": 1, - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }, - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AJ", - "dataset_name": "human-eval.jsonl", - "file_location": "D:/数据集/transfomer数据集/HumanEval/", - "code_Id": 1, - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }, - { - "task_name_template": "{prefix}-jointCloudAi-trainingtask", - "prefix": "AK", - "dataset_name": "HumanEval_X.zip", - "file_location": "D:/数据集/transfomer数据集/HumanEval_X/", - "code_Id": 1, - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - } - ] - logger.info(f"成功生成 {len(base_tasks)} 个任务模板") - return base_tasks - - -def read_tasks(templates: List[Dict]) -> None: - """将任务模板转换为可提交的任务列表(存储到全局 task_list)""" - global task_list - task_list = [] # 清空历史任务 - if not templates: - logger.warning("任务模板为空,跳过任务创建") - return - - for template in templates: - try: - # 生成任务基本信息 - submit_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - task = { - "task_name": template["task_name_template"].format(prefix=template["prefix"]), - "package_name": f"{template['prefix'].lower()}-training-pkg", # 包名小写 - "dataset_name": template["dataset_name"], - "code_Id": template["code_Id"], - "target_id": str(uuid4()), # 唯一任务ID - "resource": { - "CPU": template["CPU"], - "MEMORY": template["MEMORY"], - "NPU": template["NPU"] - }, - "status": "submitted", # 初始状态:已提交待处理 - "submit_time": submit_time_str, - "file_location": template["file_location"], - "error_msg": "" # 错误信息存储 - } - task_list.append(task) - logger.info(f"任务创建成功 | task_name: {task['task_name']} | target_id: {task['target_id']}") - except KeyError as e: - logger.error(f"任务模板缺少字段: {e} | 模板内容: {template}") - except Exception as e: - logger.error(f"创建任务失败: {str(e)} | 模板内容: {template}") - - -def search_resource() -> Dict[str, int]: - """查询集群可用资源(实际场景需替换为真实API调用)""" - # 模拟集群资源查询结果(单位:CPU核数,MEMORY GB,NPU卡数) - available_resources = { - "CPU": 48, # 假设集群可用CPU为48核 - "MEMORY": 512, # 可用内存512GB - "NPU": 2 # 可用NPU 2卡 - } - logger.info(f"查询到集群可用资源: {available_resources}") - return available_resources - - -def get_token() -> Optional[str]: - """获取认证Token(实际场景需确保登录接口正确性)""" - login_url = "http://119.45.255.234:30180/jcc-admin/admin/login" - login_payload = {"username": "admin", "password": "Nudt@123"} - try: - response = requests.post(login_url, json=login_payload, timeout=10) - response.raise_for_status() # 触发HTTP错误状态码异常 - result = response.json() - if result.get("code") == 200 and "data" in result and "token" in result["data"]: - logger.info("Token获取成功") - return result["data"]["token"] - else: - logger.error(f"Token获取失败 | 响应: {result}") - return None - except requests.exceptions.RequestException as e: - logger.error(f"登录请求异常: {str(e)}", exc_info=True) - return None - - -def submit_single_task(task: Dict) -> bool: - """提交单个任务到集群(包含完整流程:创建文件夹→上传文件→提交任务)""" - token = get_token() - if not token: - task["status"] = "failed" - task["error_msg"] = "获取Token失败" - return False - - headers = { - 'Content-Type': 'application/json', - 'Authorization': f'Bearer {token}' - } - task_name = task["task_name"] - package_name = task["package_name"] - file_name = task["dataset_name"] - file_location = task["file_location"] - son_code_Id = task["code_Id"] - file_path = os.path.join(file_location, file_name) # 构建完整文件路径 - - try: - # ---------------------- 第一步:创建数据集文件夹 ---------------------- - create_url = "http://119.45.255.234:30180/jsm/jobSet/createPackage" - create_payload = { - "userID": 5, - "name": package_name, - "dataType": "dataset", - "packageID": 0, - "uploadPriority": {"type": "specify", "clusters": ["1790300942428540928"]}, - "bindingInfo": { - "clusterIDs": ["1790300942428540928"], - "name": package_name, - "category": "image", - "type": "dataset", - "imageID": "", - "bias": [], - "region": [], - "chip": ["ASCEND"], - "selectedCluster": [], - "modelType": "", - "env": "", - "version": "", - "packageID": 0, - "points": 0 - } - } - create_resp = requests.post(create_url, json=create_payload, headers=headers, timeout=15) - create_resp.raise_for_status() - create_result = create_resp.json() - if create_result.get("code") != 200: - raise ValueError(f"创建文件夹失败 | API返回: {create_result}") - packageID = create_result["data"]["newPackage"]["packageID"] - logger.info(f"[{task_name}] 第一步:创建文件夹成功 | packageID: {packageID}") - - # ---------------------- 第三步:上传数据集文件 ---------------------- - # 跳过第二步(查询列表),直接执行上传(实际场景可保留检查) - upload_url = "http://119.45.255.234:30180/jcs/object/upload" - if not os.path.exists(file_path): - raise FileNotFoundError(f"数据集文件不存在 | path: {file_path}") - - info_data = { - "userID": 5, - "packageID": packageID, - "loadTo": [3], - "loadToPath": [f"/dataset/5/{package_name}/"] - } - file_headers = {'Authorization': f'Bearer {token}'} - with open(file_path, 'rb') as f: - form_data = {"info": (None, json.dumps(info_data)), "files": f} - upload_resp = requests.post(upload_url, files=form_data, headers=file_headers, timeout=300) # 大文件超时设为5分钟 - upload_resp.raise_for_status() - upload_result = upload_resp.json() - if upload_result.get("code") != 200: - raise ValueError(f"文件上传失败 | API返回: {upload_result}") - object_id = upload_result["data"]["uploadeds"][0]["objectID"] - logger.info(f"[{task_name}] 第三步:文件上传成功 | objectID: {object_id}") - - # ---------------------- 第四步:通知上传完成 ---------------------- - notify_url = "http://119.45.255.234:30180/jsm/jobSet/notifyUploaded" - notify_payload = { - "userID": 5, - "packageID": packageID, - "uploadParams": { - "dataType": "dataset", - "uploadInfo": {"type": "local", "localPath": file_name, "objectIDs": [object_id]} - } - } - notify_resp = requests.post(notify_url, json=notify_payload, headers=headers, timeout=15) - notify_resp.raise_for_status() - notify_result = notify_resp.json() - if notify_result.get("code") != 200: - raise ValueError(f"通知上传完成失败 | API返回: {notify_result}") - logger.info(f"[{task_name}] 第四步:通知上传完成成功") - - # ---------------------- 第七步:绑定数据集到集群 ---------------------- - bind_url = "http://119.45.255.234:30180/jsm/jobSet/binding" - bind_payload = { - "userID": 5, - "info": {"type": "dataset", "packageID": packageID, "clusterIDs": ["1790300942428540928"]} - } - bind_resp = requests.post(bind_url, json=bind_payload, headers=headers, timeout=15) - bind_resp.raise_for_status() - bind_result = bind_resp.json() - if bind_result.get("code") != 200: - raise ValueError(f"绑定集群失败 | API返回: {bind_result}") - logger.info(f"[{task_name}] 第七步:数据集绑定集群成功") - - # ---------------------- 第八步:查询绑定ID ---------------------- - query_bind_url = "http://119.45.255.234:30180/jsm/jobSet/queryBinding" - query_bind_payload = { - "dataType": "dataset", - "param": {"userID": 5, "bindingID": -1, "type": "private"} - } - query_bind_resp = requests.post(query_bind_url, json=query_bind_payload, headers=headers, timeout=15) - query_bind_resp.raise_for_status() - query_bind_result = query_bind_resp.json() - if query_bind_result.get("code") != 200: - raise ValueError(f"查询绑定失败 | API返回: {query_bind_result}") - - # 提取目标绑定ID - target_id = None - for data in query_bind_result["data"]["datas"]: - if data["info"]["name"] == package_name: - target_id = data["ID"] - break - if not target_id: - raise ValueError(f"未找到package_name={package_name}的绑定ID") - logger.info(f"[{task_name}] 第八步:获取绑定ID成功 | target_id: {target_id}") - - # ---------------------- 第九步:提交训练任务 ---------------------- - submit_url = "http://119.45.255.234:30180/jsm/jobSet/submit" - task_res = task["resource"] - submit_payload = { - "userID": 5, - "jobSetInfo": { - "jobs": [ - { - "localJobID": "1", - "name": task_name, - "description": "自动提交的CNN训练任务", - "type": "AI", - "files": { - "dataset": {"type": "Binding", "bindingID": target_id}, - "model": {"type": "Binding", "bindingID": 421}, # 固定模型绑定ID - "image": {"type": "Image", "imageID": 11} # 固定镜像ID - }, - "jobResources": { - "scheduleStrategy": "dataLocality", - "clusters": [ - { - "clusterID": "1790300942428540928", - "runtime": {"envs": {}, "params": {}}, - "code": {"type": "Binding", "bindingID": son_code_Id}, - "resources": [ - {"type": "CPU", "name": "ARM", "number": task_res["CPU"]}, - {"type": "MEMORY", "name": "RAM", "number": task_res["MEMORY"]}, - {"type": "MEMORY", "name": "VRAM", "number": 32}, # 固定显存 - {"type": "STORAGE", "name": "DISK", "number": 32}, # 固定存储 - {"type": "NPU", "name": "ASCEND910", "number": task_res["NPU"]} - ] - } - ] - } - }, - {"localJobID": "4", "type": "DataReturn", "targetLocalJobID": "1"} - ] - } - } - submit_resp = requests.post(submit_url, json=submit_payload, headers=headers, timeout=15) - submit_resp.raise_for_status() - submit_result = submit_resp.json() - if submit_result.get("code") != 200: - raise ValueError(f"任务提交失败 | API返回: {submit_result}") - logger.info(f"[{task_name}] 第九步:任务提交成功 | 任务ID: {submit_result.get('data', {}).get('jobSetID')}") - - # 更新任务状态为成功 - task["status"] = "submitted_success" - return True - - except Exception as e: - error_msg = f"提交失败: {str(e)}" - task["status"] = "failed" - task["error_msg"] = error_msg - logger.error(f"[{task_name}] {error_msg}", exc_info=True) - return False - - -def commit_tasks(available_resources: Dict[str, int], max_workers: int = 3) -> None: - """提交满足资源条件的任务(并发处理)""" - global task_list - if not task_list: - logger.warning("无待提交任务,退出提交流程") - return - - # 筛选可提交任务(状态为submitted且资源满足) - eligible_tasks = [] - for task in task_list: - if task["status"] != "submitted": - logger.info(f"任务 {task['task_name']} 状态为 {task['status']},跳过提交") - continue - task_res = task["resource"] - # 检查CPU、内存、NPU是否均满足 - if (task_res["CPU"] <= available_resources["CPU"] and - task_res["MEMORY"] <= available_resources["MEMORY"] and - task_res["NPU"] <= available_resources["NPU"]): - eligible_tasks.append(task) - logger.info(f"任务 {task['task_name']} 资源满足,加入提交队列") - else: - logger.warning(f"任务 {task['task_name']} 资源不足 | 需求: {task_res} | 可用: {available_resources}") - - if not eligible_tasks: - logger.info("无满足资源条件的任务,提交流程结束") - return - - # 并发提交任务(限制最大并发数,避免请求过载) - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - # 提交任务并跟踪Future对象 - futures = {executor.submit(submit_single_task, task): task for task in eligible_tasks} - # 处理任务结果 - for future in concurrent.futures.as_completed(futures): - task = futures[future] - try: - future.result() # 触发可能的异常 - except Exception as e: - logger.error(f"任务 {task['task_name']} 执行异常: {str(e)}", exc_info=True) - task["status"] = "error" - task["error_msg"] = f"执行异常: {str(e)}" - - -if __name__ == "__main__": - # 1. 生成任务模板 - task_templates = generate_tasks() - - # 2. 创建任务列表(全局task_list) - read_tasks(task_templates) - - # 3. 验证任务列表 - logger.info(f"\n任务列表初始化完成,总任务数: {len(task_list)}") - if task_list: - sample_task = task_list[0] - logger.info(f"示例任务详情: {json.dumps(sample_task, indent=2, ensure_ascii=False)}") - - # 4. 查询集群可用资源 - available_resources = search_resource() - - # 5. 提交满足资源条件的任务(并发数3) - commit_tasks(available_resources, max_workers=3) - - # 6. 输出最终任务状态汇总 - logger.info("\n===== 任务提交结果汇总 =====") - success_count = sum(1 for t in task_list if t["status"] == "succeed") - failed_count = sum(1 for t in task_list if t["status"] == "failed") - pending_count = sum(1 for t in task_list if t["status"] == "pending") - running_count = sum(1 for t in task_list if t["status"] == "Running") - error_count = sum(1 for t in task_list if t["status"] == "error") - - logger.info( - f"总任务数: {len(task_list)} | 成功: {success_count} | 失败: {failed_count} | 待提交: {pending_count} | 异常: {error_count}") - for task in task_list: - logger.info( - f"task_name: {task['task_name']} | status: {task['status']} | error: {task['error_msg'][:100]}") # 截断长错误信息 \ No newline at end of file