SchedulingSimulator/commit_tasks.py

467 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import concurrent.futures
import time
import queue
import logging
import os
from uuid import uuid4
from typing import Dict, List, Optional
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# 全局任务列表(用于跟踪所有任务状态)
task_list: List[Dict] = []
def generate_tasks() -> List[Dict]:
"""生成任务模板列表(包含差异化配置)"""
base_tasks = [
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AA",
"dataset_name": "data1.zip",
"code_Id": 1,
"file_location": "D:/数据集/cnn数据集/data1/",
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AB",
"dataset_name": "cifar-10-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data2/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AC",
"dataset_name": "cifar-100-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data3/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AD",
"dataset_name": "cifar-100-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data3/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AE",
"dataset_name": "dev.jsonl",
"file_location": "D:/数据集/transfomer数据集/BoolQ/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AF",
"dataset_name": "ceval.zip",
"file_location": "D:/数据集/transfomer数据集/CEval/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AG",
"dataset_name": "CMMLU.zip",
"file_location": "D:/数据集/transfomer数据集/CMMLU/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AH",
"dataset_name": "mental_health.csv",
"file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AI",
"dataset_name": "GSM8K.jsonl",
"file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AJ",
"dataset_name": "human-eval.jsonl",
"file_location": "D:/数据集/transfomer数据集/HumanEval/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AK",
"dataset_name": "HumanEval_X.zip",
"file_location": "D:/数据集/transfomer数据集/HumanEval_X/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
}
]
logger.info(f"成功生成 {len(base_tasks)} 个任务模板")
return base_tasks
def read_tasks(templates: List[Dict]) -> None:
"""将任务模板转换为可提交的任务列表(存储到全局 task_list"""
global task_list
task_list = [] # 清空历史任务
if not templates:
logger.warning("任务模板为空,跳过任务创建")
return
for template in templates:
try:
# 生成任务基本信息
submit_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
task = {
"task_name": template["task_name_template"].format(prefix=template["prefix"]),
"package_name": f"{template['prefix'].lower()}-training-pkg", # 包名小写
"dataset_name": template["dataset_name"],
"code_Id": template["code_Id"],
"target_id": str(uuid4()), # 唯一任务ID
"resource": {
"CPU": template["CPU"],
"MEMORY": template["MEMORY"],
"NPU": template["NPU"]
},
"status": "submitted", # 初始状态:已提交待处理
"submit_time": submit_time_str,
"file_location": template["file_location"],
"error_msg": "" # 错误信息存储
}
task_list.append(task)
logger.info(f"任务创建成功 | task_name: {task['task_name']} | target_id: {task['target_id']}")
except KeyError as e:
logger.error(f"任务模板缺少字段: {e} | 模板内容: {template}")
except Exception as e:
logger.error(f"创建任务失败: {str(e)} | 模板内容: {template}")
def search_resource() -> Dict[str, int]:
"""查询集群可用资源实际场景需替换为真实API调用"""
# 模拟集群资源查询结果单位CPU核数MEMORY GBNPU卡数
available_resources = {
"CPU": 48, # 假设集群可用CPU为48核
"MEMORY": 512, # 可用内存512GB
"NPU": 2 # 可用NPU 2卡
}
logger.info(f"查询到集群可用资源: {available_resources}")
return available_resources
def get_token() -> Optional[str]:
"""获取认证Token实际场景需确保登录接口正确性"""
login_url = "http://119.45.255.234:30180/jcc-admin/admin/login"
login_payload = {"username": "admin", "password": "Nudt@123"}
try:
response = requests.post(login_url, json=login_payload, timeout=10)
response.raise_for_status() # 触发HTTP错误状态码异常
result = response.json()
if result.get("code") == 200 and "data" in result and "token" in result["data"]:
logger.info("Token获取成功")
return result["data"]["token"]
else:
logger.error(f"Token获取失败 | 响应: {result}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"登录请求异常: {str(e)}", exc_info=True)
return None
def submit_single_task(task: Dict) -> bool:
"""提交单个任务到集群(包含完整流程:创建文件夹→上传文件→提交任务)"""
token = get_token()
if not token:
task["status"] = "failed"
task["error_msg"] = "获取Token失败"
return False
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {token}'
}
task_name = task["task_name"]
package_name = task["package_name"]
file_name = task["dataset_name"]
file_location = task["file_location"]
son_code_Id = task["code_Id"]
file_path = os.path.join(file_location, file_name) # 构建完整文件路径
try:
# ---------------------- 第一步:创建数据集文件夹 ----------------------
create_url = "http://119.45.255.234:30180/jsm/jobSet/createPackage"
create_payload = {
"userID": 5,
"name": package_name,
"dataType": "dataset",
"packageID": 0,
"uploadPriority": {"type": "specify", "clusters": ["1790300942428540928"]},
"bindingInfo": {
"clusterIDs": ["1790300942428540928"],
"name": package_name,
"category": "image",
"type": "dataset",
"imageID": "",
"bias": [],
"region": [],
"chip": ["ASCEND"],
"selectedCluster": [],
"modelType": "",
"env": "",
"version": "",
"packageID": 0,
"points": 0
}
}
create_resp = requests.post(create_url, json=create_payload, headers=headers, timeout=15)
create_resp.raise_for_status()
create_result = create_resp.json()
if create_result.get("code") != 200:
raise ValueError(f"创建文件夹失败 | API返回: {create_result}")
packageID = create_result["data"]["newPackage"]["packageID"]
logger.info(f"[{task_name}] 第一步:创建文件夹成功 | packageID: {packageID}")
# ---------------------- 第三步:上传数据集文件 ----------------------
# 跳过第二步(查询列表),直接执行上传(实际场景可保留检查)
upload_url = "http://119.45.255.234:30180/jcs/object/upload"
if not os.path.exists(file_path):
raise FileNotFoundError(f"数据集文件不存在 | path: {file_path}")
info_data = {
"userID": 5,
"packageID": packageID,
"loadTo": [3],
"loadToPath": [f"/dataset/5/{package_name}/"]
}
file_headers = {'Authorization': f'Bearer {token}'}
with open(file_path, 'rb') as f:
form_data = {"info": (None, json.dumps(info_data)), "files": f}
upload_resp = requests.post(upload_url, files=form_data, headers=file_headers, timeout=300) # 大文件超时设为5分钟
upload_resp.raise_for_status()
upload_result = upload_resp.json()
if upload_result.get("code") != 200:
raise ValueError(f"文件上传失败 | API返回: {upload_result}")
object_id = upload_result["data"]["uploadeds"][0]["objectID"]
logger.info(f"[{task_name}] 第三步:文件上传成功 | objectID: {object_id}")
# ---------------------- 第四步:通知上传完成 ----------------------
notify_url = "http://119.45.255.234:30180/jsm/jobSet/notifyUploaded"
notify_payload = {
"userID": 5,
"packageID": packageID,
"uploadParams": {
"dataType": "dataset",
"uploadInfo": {"type": "local", "localPath": file_name, "objectIDs": [object_id]}
}
}
notify_resp = requests.post(notify_url, json=notify_payload, headers=headers, timeout=15)
notify_resp.raise_for_status()
notify_result = notify_resp.json()
if notify_result.get("code") != 200:
raise ValueError(f"通知上传完成失败 | API返回: {notify_result}")
logger.info(f"[{task_name}] 第四步:通知上传完成成功")
# ---------------------- 第七步:绑定数据集到集群 ----------------------
bind_url = "http://119.45.255.234:30180/jsm/jobSet/binding"
bind_payload = {
"userID": 5,
"info": {"type": "dataset", "packageID": packageID, "clusterIDs": ["1790300942428540928"]}
}
bind_resp = requests.post(bind_url, json=bind_payload, headers=headers, timeout=15)
bind_resp.raise_for_status()
bind_result = bind_resp.json()
if bind_result.get("code") != 200:
raise ValueError(f"绑定集群失败 | API返回: {bind_result}")
logger.info(f"[{task_name}] 第七步:数据集绑定集群成功")
# ---------------------- 第八步查询绑定ID ----------------------
query_bind_url = "http://119.45.255.234:30180/jsm/jobSet/queryBinding"
query_bind_payload = {
"dataType": "dataset",
"param": {"userID": 5, "bindingID": -1, "type": "private"}
}
query_bind_resp = requests.post(query_bind_url, json=query_bind_payload, headers=headers, timeout=15)
query_bind_resp.raise_for_status()
query_bind_result = query_bind_resp.json()
if query_bind_result.get("code") != 200:
raise ValueError(f"查询绑定失败 | API返回: {query_bind_result}")
# 提取目标绑定ID
target_id = None
for data in query_bind_result["data"]["datas"]:
if data["info"]["name"] == package_name:
target_id = data["ID"]
break
if not target_id:
raise ValueError(f"未找到package_name={package_name}的绑定ID")
logger.info(f"[{task_name}] 第八步获取绑定ID成功 | target_id: {target_id}")
# ---------------------- 第九步:提交训练任务 ----------------------
submit_url = "http://119.45.255.234:30180/jsm/jobSet/submit"
task_res = task["resource"]
submit_payload = {
"userID": 5,
"jobSetInfo": {
"jobs": [
{
"localJobID": "1",
"name": task_name,
"description": "自动提交的CNN训练任务",
"type": "AI",
"files": {
"dataset": {"type": "Binding", "bindingID": target_id},
"model": {"type": "Binding", "bindingID": 421}, # 固定模型绑定ID
"image": {"type": "Image", "imageID": 11} # 固定镜像ID
},
"jobResources": {
"scheduleStrategy": "dataLocality",
"clusters": [
{
"clusterID": "1790300942428540928",
"runtime": {"envs": {}, "params": {}},
"code": {"type": "Binding", "bindingID": son_code_Id},
"resources": [
{"type": "CPU", "name": "ARM", "number": task_res["CPU"]},
{"type": "MEMORY", "name": "RAM", "number": task_res["MEMORY"]},
{"type": "MEMORY", "name": "VRAM", "number": 32}, # 固定显存
{"type": "STORAGE", "name": "DISK", "number": 32}, # 固定存储
{"type": "NPU", "name": "ASCEND910", "number": task_res["NPU"]}
]
}
]
}
},
{"localJobID": "4", "type": "DataReturn", "targetLocalJobID": "1"}
]
}
}
submit_resp = requests.post(submit_url, json=submit_payload, headers=headers, timeout=15)
submit_resp.raise_for_status()
submit_result = submit_resp.json()
if submit_result.get("code") != 200:
raise ValueError(f"任务提交失败 | API返回: {submit_result}")
logger.info(f"[{task_name}] 第九步:任务提交成功 | 任务ID: {submit_result.get('data', {}).get('jobSetID')}")
# 更新任务状态为成功
task["status"] = "submitted_success"
return True
except Exception as e:
error_msg = f"提交失败: {str(e)}"
task["status"] = "failed"
task["error_msg"] = error_msg
logger.error(f"[{task_name}] {error_msg}", exc_info=True)
return False
def commit_tasks(available_resources: Dict[str, int], max_workers: int = 3) -> None:
"""提交满足资源条件的任务(并发处理)"""
global task_list
if not task_list:
logger.warning("无待提交任务,退出提交流程")
return
# 筛选可提交任务状态为submitted且资源满足
eligible_tasks = []
for task in task_list:
if task["status"] != "submitted":
logger.info(f"任务 {task['task_name']} 状态为 {task['status']},跳过提交")
continue
task_res = task["resource"]
# 检查CPU、内存、NPU是否均满足
if (task_res["CPU"] <= available_resources["CPU"] and
task_res["MEMORY"] <= available_resources["MEMORY"] and
task_res["NPU"] <= available_resources["NPU"]):
eligible_tasks.append(task)
logger.info(f"任务 {task['task_name']} 资源满足,加入提交队列")
else:
logger.warning(f"任务 {task['task_name']} 资源不足 | 需求: {task_res} | 可用: {available_resources}")
if not eligible_tasks:
logger.info("无满足资源条件的任务,提交流程结束")
return
# 并发提交任务(限制最大并发数,避免请求过载)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交任务并跟踪Future对象
futures = {executor.submit(submit_single_task, task): task for task in eligible_tasks}
# 处理任务结果
for future in concurrent.futures.as_completed(futures):
task = futures[future]
try:
future.result() # 触发可能的异常
except Exception as e:
logger.error(f"任务 {task['task_name']} 执行异常: {str(e)}", exc_info=True)
task["status"] = "error"
task["error_msg"] = f"执行异常: {str(e)}"
if __name__ == "__main__":
# 1. 生成任务模板
task_templates = generate_tasks()
# 2. 创建任务列表全局task_list
read_tasks(task_templates)
# 3. 验证任务列表
logger.info(f"\n任务列表初始化完成,总任务数: {len(task_list)}")
if task_list:
sample_task = task_list[0]
logger.info(f"示例任务详情: {json.dumps(sample_task, indent=2, ensure_ascii=False)}")
# 4. 查询集群可用资源
available_resources = search_resource()
# 5. 提交满足资源条件的任务并发数3
commit_tasks(available_resources, max_workers=3)
# 6. 输出最终任务状态汇总
logger.info("\n===== 任务提交结果汇总 =====")
success_count = sum(1 for t in task_list if t["status"] == "succeed")
failed_count = sum(1 for t in task_list if t["status"] == "failed")
pending_count = sum(1 for t in task_list if t["status"] == "pending")
running_count = sum(1 for t in task_list if t["status"] == "Running")
error_count = sum(1 for t in task_list if t["status"] == "error")
logger.info(
f"总任务数: {len(task_list)} | 成功: {success_count} | 失败: {failed_count} | 待提交: {pending_count} | 异常: {error_count}")
for task in task_list:
logger.info(
f"task_name: {task['task_name']} | status: {task['status']} | error: {task['error_msg'][:100]}") # 截断长错误信息