Delete commit_tasks.py

This commit is contained in:
qiwang 2025-07-21 18:58:38 +08:00
parent 658b668bb5
commit e33ce289e2
1 changed files with 0 additions and 467 deletions

View File

@ -1,467 +0,0 @@
import requests
import json
import concurrent.futures
import time
import queue
import logging
import os
from uuid import uuid4
from typing import Dict, List, Optional
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# 全局任务列表(用于跟踪所有任务状态)
task_list: List[Dict] = []
def generate_tasks() -> List[Dict]:
"""生成任务模板列表(包含差异化配置)"""
base_tasks = [
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AA",
"dataset_name": "data1.zip",
"code_Id": 1,
"file_location": "D:/数据集/cnn数据集/data1/",
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AB",
"dataset_name": "cifar-10-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data2/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AC",
"dataset_name": "cifar-100-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data3/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AD",
"dataset_name": "cifar-100-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data3/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AE",
"dataset_name": "dev.jsonl",
"file_location": "D:/数据集/transfomer数据集/BoolQ/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AF",
"dataset_name": "ceval.zip",
"file_location": "D:/数据集/transfomer数据集/CEval/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AG",
"dataset_name": "CMMLU.zip",
"file_location": "D:/数据集/transfomer数据集/CMMLU/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AH",
"dataset_name": "mental_health.csv",
"file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AI",
"dataset_name": "GSM8K.jsonl",
"file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AJ",
"dataset_name": "human-eval.jsonl",
"file_location": "D:/数据集/transfomer数据集/HumanEval/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AK",
"dataset_name": "HumanEval_X.zip",
"file_location": "D:/数据集/transfomer数据集/HumanEval_X/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
}
]
logger.info(f"成功生成 {len(base_tasks)} 个任务模板")
return base_tasks
def read_tasks(templates: List[Dict]) -> None:
"""将任务模板转换为可提交的任务列表(存储到全局 task_list"""
global task_list
task_list = [] # 清空历史任务
if not templates:
logger.warning("任务模板为空,跳过任务创建")
return
for template in templates:
try:
# 生成任务基本信息
submit_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
task = {
"task_name": template["task_name_template"].format(prefix=template["prefix"]),
"package_name": f"{template['prefix'].lower()}-training-pkg", # 包名小写
"dataset_name": template["dataset_name"],
"code_Id": template["code_Id"],
"target_id": str(uuid4()), # 唯一任务ID
"resource": {
"CPU": template["CPU"],
"MEMORY": template["MEMORY"],
"NPU": template["NPU"]
},
"status": "submitted", # 初始状态:已提交待处理
"submit_time": submit_time_str,
"file_location": template["file_location"],
"error_msg": "" # 错误信息存储
}
task_list.append(task)
logger.info(f"任务创建成功 | task_name: {task['task_name']} | target_id: {task['target_id']}")
except KeyError as e:
logger.error(f"任务模板缺少字段: {e} | 模板内容: {template}")
except Exception as e:
logger.error(f"创建任务失败: {str(e)} | 模板内容: {template}")
def search_resource() -> Dict[str, int]:
"""查询集群可用资源实际场景需替换为真实API调用"""
# 模拟集群资源查询结果单位CPU核数MEMORY GBNPU卡数
available_resources = {
"CPU": 48, # 假设集群可用CPU为48核
"MEMORY": 512, # 可用内存512GB
"NPU": 2 # 可用NPU 2卡
}
logger.info(f"查询到集群可用资源: {available_resources}")
return available_resources
def get_token() -> Optional[str]:
"""获取认证Token实际场景需确保登录接口正确性"""
login_url = "http://119.45.255.234:30180/jcc-admin/admin/login"
login_payload = {"username": "admin", "password": "Nudt@123"}
try:
response = requests.post(login_url, json=login_payload, timeout=10)
response.raise_for_status() # 触发HTTP错误状态码异常
result = response.json()
if result.get("code") == 200 and "data" in result and "token" in result["data"]:
logger.info("Token获取成功")
return result["data"]["token"]
else:
logger.error(f"Token获取失败 | 响应: {result}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"登录请求异常: {str(e)}", exc_info=True)
return None
def submit_single_task(task: Dict) -> bool:
"""提交单个任务到集群(包含完整流程:创建文件夹→上传文件→提交任务)"""
token = get_token()
if not token:
task["status"] = "failed"
task["error_msg"] = "获取Token失败"
return False
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {token}'
}
task_name = task["task_name"]
package_name = task["package_name"]
file_name = task["dataset_name"]
file_location = task["file_location"]
son_code_Id = task["code_Id"]
file_path = os.path.join(file_location, file_name) # 构建完整文件路径
try:
# ---------------------- 第一步:创建数据集文件夹 ----------------------
create_url = "http://119.45.255.234:30180/jsm/jobSet/createPackage"
create_payload = {
"userID": 5,
"name": package_name,
"dataType": "dataset",
"packageID": 0,
"uploadPriority": {"type": "specify", "clusters": ["1790300942428540928"]},
"bindingInfo": {
"clusterIDs": ["1790300942428540928"],
"name": package_name,
"category": "image",
"type": "dataset",
"imageID": "",
"bias": [],
"region": [],
"chip": ["ASCEND"],
"selectedCluster": [],
"modelType": "",
"env": "",
"version": "",
"packageID": 0,
"points": 0
}
}
create_resp = requests.post(create_url, json=create_payload, headers=headers, timeout=15)
create_resp.raise_for_status()
create_result = create_resp.json()
if create_result.get("code") != 200:
raise ValueError(f"创建文件夹失败 | API返回: {create_result}")
packageID = create_result["data"]["newPackage"]["packageID"]
logger.info(f"[{task_name}] 第一步:创建文件夹成功 | packageID: {packageID}")
# ---------------------- 第三步:上传数据集文件 ----------------------
# 跳过第二步(查询列表),直接执行上传(实际场景可保留检查)
upload_url = "http://119.45.255.234:30180/jcs/object/upload"
if not os.path.exists(file_path):
raise FileNotFoundError(f"数据集文件不存在 | path: {file_path}")
info_data = {
"userID": 5,
"packageID": packageID,
"loadTo": [3],
"loadToPath": [f"/dataset/5/{package_name}/"]
}
file_headers = {'Authorization': f'Bearer {token}'}
with open(file_path, 'rb') as f:
form_data = {"info": (None, json.dumps(info_data)), "files": f}
upload_resp = requests.post(upload_url, files=form_data, headers=file_headers, timeout=300) # 大文件超时设为5分钟
upload_resp.raise_for_status()
upload_result = upload_resp.json()
if upload_result.get("code") != 200:
raise ValueError(f"文件上传失败 | API返回: {upload_result}")
object_id = upload_result["data"]["uploadeds"][0]["objectID"]
logger.info(f"[{task_name}] 第三步:文件上传成功 | objectID: {object_id}")
# ---------------------- 第四步:通知上传完成 ----------------------
notify_url = "http://119.45.255.234:30180/jsm/jobSet/notifyUploaded"
notify_payload = {
"userID": 5,
"packageID": packageID,
"uploadParams": {
"dataType": "dataset",
"uploadInfo": {"type": "local", "localPath": file_name, "objectIDs": [object_id]}
}
}
notify_resp = requests.post(notify_url, json=notify_payload, headers=headers, timeout=15)
notify_resp.raise_for_status()
notify_result = notify_resp.json()
if notify_result.get("code") != 200:
raise ValueError(f"通知上传完成失败 | API返回: {notify_result}")
logger.info(f"[{task_name}] 第四步:通知上传完成成功")
# ---------------------- 第七步:绑定数据集到集群 ----------------------
bind_url = "http://119.45.255.234:30180/jsm/jobSet/binding"
bind_payload = {
"userID": 5,
"info": {"type": "dataset", "packageID": packageID, "clusterIDs": ["1790300942428540928"]}
}
bind_resp = requests.post(bind_url, json=bind_payload, headers=headers, timeout=15)
bind_resp.raise_for_status()
bind_result = bind_resp.json()
if bind_result.get("code") != 200:
raise ValueError(f"绑定集群失败 | API返回: {bind_result}")
logger.info(f"[{task_name}] 第七步:数据集绑定集群成功")
# ---------------------- 第八步查询绑定ID ----------------------
query_bind_url = "http://119.45.255.234:30180/jsm/jobSet/queryBinding"
query_bind_payload = {
"dataType": "dataset",
"param": {"userID": 5, "bindingID": -1, "type": "private"}
}
query_bind_resp = requests.post(query_bind_url, json=query_bind_payload, headers=headers, timeout=15)
query_bind_resp.raise_for_status()
query_bind_result = query_bind_resp.json()
if query_bind_result.get("code") != 200:
raise ValueError(f"查询绑定失败 | API返回: {query_bind_result}")
# 提取目标绑定ID
target_id = None
for data in query_bind_result["data"]["datas"]:
if data["info"]["name"] == package_name:
target_id = data["ID"]
break
if not target_id:
raise ValueError(f"未找到package_name={package_name}的绑定ID")
logger.info(f"[{task_name}] 第八步获取绑定ID成功 | target_id: {target_id}")
# ---------------------- 第九步:提交训练任务 ----------------------
submit_url = "http://119.45.255.234:30180/jsm/jobSet/submit"
task_res = task["resource"]
submit_payload = {
"userID": 5,
"jobSetInfo": {
"jobs": [
{
"localJobID": "1",
"name": task_name,
"description": "自动提交的CNN训练任务",
"type": "AI",
"files": {
"dataset": {"type": "Binding", "bindingID": target_id},
"model": {"type": "Binding", "bindingID": 421}, # 固定模型绑定ID
"image": {"type": "Image", "imageID": 11} # 固定镜像ID
},
"jobResources": {
"scheduleStrategy": "dataLocality",
"clusters": [
{
"clusterID": "1790300942428540928",
"runtime": {"envs": {}, "params": {}},
"code": {"type": "Binding", "bindingID": son_code_Id},
"resources": [
{"type": "CPU", "name": "ARM", "number": task_res["CPU"]},
{"type": "MEMORY", "name": "RAM", "number": task_res["MEMORY"]},
{"type": "MEMORY", "name": "VRAM", "number": 32}, # 固定显存
{"type": "STORAGE", "name": "DISK", "number": 32}, # 固定存储
{"type": "NPU", "name": "ASCEND910", "number": task_res["NPU"]}
]
}
]
}
},
{"localJobID": "4", "type": "DataReturn", "targetLocalJobID": "1"}
]
}
}
submit_resp = requests.post(submit_url, json=submit_payload, headers=headers, timeout=15)
submit_resp.raise_for_status()
submit_result = submit_resp.json()
if submit_result.get("code") != 200:
raise ValueError(f"任务提交失败 | API返回: {submit_result}")
logger.info(f"[{task_name}] 第九步:任务提交成功 | 任务ID: {submit_result.get('data', {}).get('jobSetID')}")
# 更新任务状态为成功
task["status"] = "submitted_success"
return True
except Exception as e:
error_msg = f"提交失败: {str(e)}"
task["status"] = "failed"
task["error_msg"] = error_msg
logger.error(f"[{task_name}] {error_msg}", exc_info=True)
return False
def commit_tasks(available_resources: Dict[str, int], max_workers: int = 3) -> None:
"""提交满足资源条件的任务(并发处理)"""
global task_list
if not task_list:
logger.warning("无待提交任务,退出提交流程")
return
# 筛选可提交任务状态为submitted且资源满足
eligible_tasks = []
for task in task_list:
if task["status"] != "submitted":
logger.info(f"任务 {task['task_name']} 状态为 {task['status']},跳过提交")
continue
task_res = task["resource"]
# 检查CPU、内存、NPU是否均满足
if (task_res["CPU"] <= available_resources["CPU"] and
task_res["MEMORY"] <= available_resources["MEMORY"] and
task_res["NPU"] <= available_resources["NPU"]):
eligible_tasks.append(task)
logger.info(f"任务 {task['task_name']} 资源满足,加入提交队列")
else:
logger.warning(f"任务 {task['task_name']} 资源不足 | 需求: {task_res} | 可用: {available_resources}")
if not eligible_tasks:
logger.info("无满足资源条件的任务,提交流程结束")
return
# 并发提交任务(限制最大并发数,避免请求过载)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交任务并跟踪Future对象
futures = {executor.submit(submit_single_task, task): task for task in eligible_tasks}
# 处理任务结果
for future in concurrent.futures.as_completed(futures):
task = futures[future]
try:
future.result() # 触发可能的异常
except Exception as e:
logger.error(f"任务 {task['task_name']} 执行异常: {str(e)}", exc_info=True)
task["status"] = "error"
task["error_msg"] = f"执行异常: {str(e)}"
if __name__ == "__main__":
# 1. 生成任务模板
task_templates = generate_tasks()
# 2. 创建任务列表全局task_list
read_tasks(task_templates)
# 3. 验证任务列表
logger.info(f"\n任务列表初始化完成,总任务数: {len(task_list)}")
if task_list:
sample_task = task_list[0]
logger.info(f"示例任务详情: {json.dumps(sample_task, indent=2, ensure_ascii=False)}")
# 4. 查询集群可用资源
available_resources = search_resource()
# 5. 提交满足资源条件的任务并发数3
commit_tasks(available_resources, max_workers=3)
# 6. 输出最终任务状态汇总
logger.info("\n===== 任务提交结果汇总 =====")
success_count = sum(1 for t in task_list if t["status"] == "succeed")
failed_count = sum(1 for t in task_list if t["status"] == "failed")
pending_count = sum(1 for t in task_list if t["status"] == "pending")
running_count = sum(1 for t in task_list if t["status"] == "Running")
error_count = sum(1 for t in task_list if t["status"] == "error")
logger.info(
f"总任务数: {len(task_list)} | 成功: {success_count} | 失败: {failed_count} | 待提交: {pending_count} | 异常: {error_count}")
for task in task_list:
logger.info(
f"task_name: {task['task_name']} | status: {task['status']} | error: {task['error_msg'][:100]}") # 截断长错误信息