SchedulingSimulator/commit_tasks_0715.py

685 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import concurrent.futures
import time
import logging
import os
import threading
from uuid import uuid4
from typing import Dict, List, Optional
from datetime import datetime
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# 全局任务字典key=target_idvalue=任务详情)
task_map: Dict[str, Dict] = {}
# 线程锁
task_map_lock = threading.Lock()
# 集群资源
cluster_resources: Dict[str, Dict] = {
# modelarts集群 ID1790300942428540928
"1790300942428540928": {
"total": {"CPU": 96, "MEMORY": 1024, "NPU": 4},
"available": {"CPU": 48, "MEMORY": 512, "NPU": 2}
},
# openi集群ID1865927992266461184
"1865927992266461184": {
"total": {"CPU": 48, "MEMORY": 512, "NPU": 1},
"available": {"CPU": 24, "MEMORY": 256, "NPU": 1}
},
# octopus-189章鱼集群ID1865927992266462181
"1865927992266462181": {
"total": {"CPU": 48, "MEMORY": 512, "NPU": 1},
"available": {"CPU": 24, "MEMORY": 256, "NPU": 1}
},
# shuguangAi曙光集群ID1777240145309732864
"1777240145309732864": {
"total": {"CPU": 48, "MEMORY": 512, "NPU": 1},
"available": {"CPU": 24, "MEMORY": 256, "NPU": 1}
},
}
# 数据集
class DatasetInfo(Dict):
id: str # 数据集唯一标识
name: str # 数据集名称
size: float # 数据集大小(字节)
status: str # 状态(如:"uploaded"
is_uploaded: bool # 是否已上传
upload_time: Optional[datetime] # 上传时间未上传时为None
description: Optional[str] # 可选描述信息
# 算法
class AlgorithmInfo(Dict):
id: str # 算法唯一标识
name: str # 算法名称
size: float # 数据集大小(字节)
status: str # 状态(如:"uploaded"
is_uploaded: bool # 是否已上传
upload_time: Optional[datetime] # 上传时间未上传时为None
description: Optional[str] # 可选描述信息
# 集群数据
class ClusterDatas(Dict):
datasets: Dict[str, DatasetInfo] # Key is dataset ID
algorithms: Dict[str, AlgorithmInfo] # Key is algorithm ID
from typing import Dict, Optional
from datetime import datetime
class TaskTemplate(Dict):
"""集群任务模板数据结构继承Dict支持字典操作和类型注解"""
# 任务核心标识
target_id: str # 任务唯一标识
task_name: str # 任务名称
package_name: str # 任务包名称
# 关联资源
dataset_name: str # 关联数据集名称
code_Id: str # 关联算法ID原模板字段名保留
# 资源配置嵌套字典CPU/MEMORY/NPU
resource: Dict[str, int] # 资源配置字典,键为"CPU"/"MEMORY"/"NPU",值为整数
# 任务状态与时间
status: str # 任务状态(如:"submitted"
submit_time: str # 提交时间(字符串格式,如:"2025-07-15 19:42:03"
# 附加信息
file_location: str # 任务文件存储路径
error_msg: str # 错误信息(为空表示无错误)
retry_count: int # 当前重试次数
max_retries: int # 最大重试次数
def __init__(self, **kwargs):
default_values = {
"target_id": "",
"task_name": "",
"package_name": "",
"dataset_name": "",
"code_Id": "",
"resource": {"CPU": 0, "MEMORY": 0, "NPU": 0}, # 资源默认值(整数类型)
"status": "submitted", # 初始状态:待提交
"submit_time": "",
"file_location": "",
"error_msg": "",
"retry_count": 0,
"max_retries": 5
}
# 调用父类Dict的初始化确保支持字典操作如task_template["status"]
super().__init__(default_values)
# 将字段绑定为实例属性支持点语法访问如task_template.status
for key, value in default_values.items():
setattr(self, key, value)
class TaskMonitorThread(threading.Thread):
"任务监控线程:轮询任务状态,重置可重试任务"
def __init__(self, check_interval: int = 30, name: Optional[str] = None):
super().__init__(name=name or "TaskMonitorThread")
self.check_interval = check_interval # 轮询间隔(秒)
self._stop_event = threading.Event()
self.all_tasks_completed = threading.Event() # 通知所有任务完成的事件
def run(self) -> None:
logger.info(f"监控线程启动 | 轮询间隔: {self.check_interval}秒 | 线程ID: {self.ident}")
while not self._stop_event.is_set():
with task_map_lock:
all_completed = True # 标记是否所有任务都已完成
retry_tasks = [] # 可重试任务列表
# 遍历所有任务检查状态
for task in task_map.values():
status = task["status"]
# 忽略已完成状态(成功)
if status in ["succeed"]:
continue
# 非完成状态,检查是否可重试
if status in ["failed", "error"]:
if task["retry_count"] < task["max_retries"]:
retry_tasks.append(task) # 加入重试列表
all_completed = False
else:
# 达到最大重试次数,标记为重试耗尽
task["status"] = "retry_exhausted"
logger.warning(f"任务 {task['task_name']} 达到最大重试次数({task['max_retries']}),停止重试")
all_completed = False
else:
# 其他状态如submitted未完成
all_completed = False
# 处理可重试任务重置状态为submitted
if retry_tasks:
logger.info(f"发现 {len(retry_tasks)} 个可重试任务重置状态为submitted")
for task in retry_tasks:
task["status"] = "submitted"
task["retry_count"] += 1
logger.info(
f"任务 {task['task_name']} (target_id: {task['target_id']}) | "
f"重试次数: {task['retry_count']}/{task['max_retries']} | 状态已重置"
)
# 所有任务进入最终状态,通知提交线程停止
if all_completed:
logger.info("所有任务已进入最终状态(成功或重试耗尽)")
self.all_tasks_completed.set()
break
# 等待下一次轮询
self._stop_event.wait(self.check_interval)
logger.info(f"监控线程结束 | 线程ID: {self.ident}")
def stop(self) -> None:
"""停止监控线程"""
self._stop_event.set()
class TaskSubmitThread(threading.Thread):
"""任务提交线程:循环提交待处理任务,响应监控线程信号"""
def __init__(self, max_workers: int = 3, name: Optional[str] = None):
super().__init__(name=name or "TaskSubmitThread")
self.max_workers = max_workers # 并发提交数
self.monitor_thread = monitor_thread # 关联监控线程,获取完成信号
self._stop_event = threading.Event()
def run(self) -> None:
logger.info(f"任务提交线程启动 | 并发数: {self.max_workers} | 线程ID: {self.ident}")
try:
# 循环提交任务,直到监控线程通知所有任务完成
while not self._stop_event.is_set() and not self.monitor_thread.all_tasks_completed.is_set():
# 1. 查询集群资源
available_resources = search_resource()
if not available_resources:
logger.error("未获取到集群资源信息10秒后重试...")
self._stop_event.wait(10)
continue
# 2. 提交满足条件的任务
commit_tasks(available_resources, self.max_workers)
# 3. 等待下次检查(避免高频查询)
self._stop_event.wait(5) # 5秒后再次检查任务队列
except Exception as e:
logger.error(f"任务提交线程异常终止: {str(e)}", exc_info=True)
finally:
logger.info(f"任务提交线程结束 | 线程ID: {self.ident}")
def stop(self) -> None:
"""停止提交线程"""
self._stop_event.set()
def generate_tasks() -> List[Dict]:
"""生成任务模板列表(包含差异化配置)"""
base_tasks = [
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AA",
"dataset_name": "data1.zip",
"code_Id": 1,
"file_location": "D:/数据集/cnn数据集/data1/",
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AB",
"dataset_name": "cifar-10-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data2/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AC",
"dataset_name": "cifar-100-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data3/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AD",
"dataset_name": "cifar-100-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data3/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AE",
"dataset_name": "dev.jsonl",
"file_location": "D:/数据集/transfomer数据集/BoolQ/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AF",
"dataset_name": "ceval.zip",
"file_location": "D:/数据集/transfomer数据集/CEval/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AG",
"dataset_name": "CMMLU.zip",
"file_location": "D:/数据集/transfomer数据集/CMMLU/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AH",
"dataset_name": "mental_health.csv",
"file_location": "D:/数据集/transfomer数据集/GLUE(imdb)/imdb/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AI",
"dataset_name": "GSM8K.jsonl",
"file_location": "D:/数据集/transfomer数据集/GSM8K/GSM8K/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AJ",
"dataset_name": "human-eval.jsonl",
"file_location": "D:/数据集/transfomer数据集/HumanEval/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
},
{
"task_name_template": "{prefix}-jointCloudAi-trainingtask",
"prefix": "AK",
"dataset_name": "HumanEval_X.zip",
"file_location": "D:/数据集/transfomer数据集/HumanEval_X/",
"code_Id": 1,
"CPU": 24,
"MEMORY": 256,
"NPU": 1
}
]
logger.info(f"成功生成 {len(base_tasks)} 个任务模板")
return base_tasks
from typing import List, Dict
import uuid
import time
import logging
from threading import Lock
# 全局变量定义
task_map = {}
task_map_lock = Lock()
# 定义任务模板结构(复用之前定义的)
task_template = {
"target_id": "",
"task_name": "",
"package_name": "",
"dataset_name": "",
"code_Id": "",
"resource": {
"CPU": 0,
"MEMORY": 0,
"NPU": 0
},
"status": "submitted",
"submit_time": "",
"file_location": "",
"error_msg": "",
"retry_count": 0,
"max_retries": 5
}
def read_tasks(templates: List[Dict]) -> None:
"""将任务模板转换为可提交的任务字典(新增重试相关字段)"""
global task_map, task_template
with task_map_lock:
task_map = {} # 清空历史任务
if not templates:
logger.warning("任务模板为空,跳过任务创建")
return
for template in templates:
try:
# 重置任务模板
task = task_template.copy()
target_id = str(uuid4()) # 生成唯一任务ID
submit_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 设置任务属性
task["target_id"] = target_id
task["task_name"] = template["task_name_template"].format(prefix=template["prefix"])
task["package_name"] = f"{template['prefix'].lower()}-training-pkg"
task["dataset_name"] = template["dataset_name"]
task["code_Id"] = template["code_Id"]
task["resource"] = {
"CPU": template["CPU"],
"MEMORY": template["MEMORY"],
"NPU": template["NPU"]
}
task["status"] = "submitted"
task["submit_time"] = submit_time_str
task["file_location"] = template["file_location"]
task_map[target_id] = task
logger.info(f"任务创建成功 | task_name: {task['task_name']} | target_id: {target_id}")
except KeyError as e:
logger.error(f"任务模板缺少字段: {e} | 模板内容: {template}")
except Exception as e:
logger.error(f"创建任务失败: {str(e)} | 模板内容: {template}")
def search_resource() -> Dict[str, Dict]:
"""查询集群资源(返回全局集群资源字典,包含总资源和可用资源)"""
global cluster_resources
logger.info(f"查询到集群资源: {cluster_resources}")
return cluster_resources
def get_token() -> Optional[str]:
"""获取认证Token"""
login_url = "http://119.45.255.234:30180/jcc-admin/admin/login"
login_payload = {"username": "admin", "password": "Nudt@123"}
try:
response = requests.post(login_url, json=login_payload, timeout=10)
response.raise_for_status()
result = response.json()
if result.get("code") == 200 and "data" in result and "token" in result["data"]:
logger.info("Token获取成功")
return result["data"]["token"]
else:
logger.error(f"Token获取失败 | 响应: {result}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"登录请求异常: {str(e)}", exc_info=True)
return None
def submit_single_task(task: Dict) -> bool:
"""提交单个任务到集群失败时更新状态为failed/error"""
token = get_token()
if not token:
with task_map_lock:
task["status"] = "failed"
task["error_msg"] = "获取Token失败"
return False
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {token}'
}
task_name = task["task_name"]
package_name = task["package_name"]
file_name = task["dataset_name"]
file_location = task["file_location"]
son_code_Id = task["code_Id"]
file_path = os.path.join(file_location, file_name)
try:
# 第一步:创建数据集文件夹
create_url = "http://119.45.255.234:30180/jsm/jobSet/createPackage"
create_payload = {
"userID": 5,
"name": package_name,
"dataType": "dataset",
"packageID": 0,
"uploadPriority": {"type": "specify", "clusters": ["1790300942428540928"]},
"bindingInfo": {
"clusterIDs": ["1790300942428540928"],
"name": package_name,
"category": "image",
"type": "dataset",
"imageID": "",
"bias": [],
"region": [],
"chip": ["ASCEND"],
"selectedCluster": [],
"modelType": "",
"env": "",
"version": "",
"packageID": 0,
"points": 0
}
}
create_resp = requests.post(create_url, json=create_payload, headers=headers, timeout=15)
create_resp.raise_for_status()
create_result = create_resp.json()
if create_result.get("code") != 200:
raise ValueError(f"创建文件夹失败 | API返回: {create_result}")
packageID = create_result["data"]["newPackage"]["packageID"]
logger.info(f"[{task_name}] 第一步:创建文件夹成功 | packageID: {packageID}")
# 第三步:上传数据集文件
upload_url = "http://119.45.255.234:30180/jcs/object/upload"
if not os.path.exists(file_path):
raise FileNotFoundError(f"数据集文件不存在 | path: {file_path}")
info_data = {
"userID": 5,
"packageID": packageID,
"loadTo": [3],
"loadToPath": [f"/dataset/5/{package_name}/"]
}
file_headers = {'Authorization': f'Bearer {token}'}
with open(file_path, 'rb') as f:
form_data = {"info": (None, json.dumps(info_data)), "files": f}
upload_resp = requests.post(upload_url, files=form_data, headers=file_headers, timeout=300)
upload_resp.raise_for_status()
upload_result = upload_resp.json()
if upload_result.get("code") != 200:
raise ValueError(f"文件上传失败 | API返回: {upload_result}")
object_id = upload_result["data"]["uploadeds"][0]["objectID"]
logger.info(f"[{task_name}] 第三步:文件上传成功 | objectID: {object_id}")
# 第四步:通知上传完成
notify_url = "http://119.45.255.234:30180/jsm/jobSet/notifyUploaded"
notify_payload = {
"userID": 5,
"packageID": packageID,
"uploadParams": {
"dataType": "dataset",
"uploadInfo": {"type": "local", "localPath": file_name, "objectIDs": [object_id]}
}
}
notify_resp = requests.post(notify_url, json=notify_payload, headers=headers, timeout=15)
notify_resp.raise_for_status()
notify_result = notify_resp.json()
if notify_result.get("code") != 200:
raise ValueError(f"通知上传完成失败 | API返回: {notify_result}")
logger.info(f"[{task_name}] 第四步:通知上传完成成功")
# 第七步:绑定数据集到集群
bind_url = "http://119.45.255.234:30180/jsm/jobSet/binding"
bind_payload = {
"userID": 5,
"info": {"type": "dataset", "packageID": packageID, "clusterIDs": ["1790300942428540928"]}
}
bind_resp = requests.post(bind_url, json=bind_payload, headers=headers, timeout=15)
bind_resp.raise_for_status()
bind_result = bind_resp.json()
if bind_result.get("code") != 200:
raise ValueError(f"绑定集群失败 | API返回: {bind_result}")
logger.info(f"[{task_name}] 第七步:数据集绑定集群成功")
# 第八步查询绑定ID
query_bind_url = "http://119.45.255.234:30180/jsm/jobSet/queryBinding"
query_bind_payload = {
"dataType": "dataset",
"param": {"userID": 5, "bindingID": -1, "type": "private"}
}
query_bind_resp = requests.post(query_bind_url, json=query_bind_payload, headers=headers, timeout=15).json()
if query_bind_resp.get("code") != 200:
raise ValueError(f"查询绑定失败 | API返回: {query_bind_resp}")
# 提取目标绑定ID
target_id = None
for data in query_bind_resp["data"]["datas"]:
if data["info"]["name"] == package_name:
target_id = data["ID"]
break
if not target_id:
raise ValueError(f"未找到package_name={package_name}的绑定ID")
logger.info(f"[{task_name}] 第八步获取绑定ID成功 | target_id: {target_id}")
# 第九步:提交训练任务
submit_url = "http://119.45.255.234:30180/jsm/jobSet/submit"
task_res = task["resource"]
submit_payload = {
"userID": 5,
"jobSetInfo": {
"jobs": [
{
"localJobID": "1",
"name": task_name,
"description": "自动提交的CNN训练任务",
"type": "AI",
"files": {
"dataset": {"type": "Binding", "bindingID": target_id},
"model": {"type": "Binding", "bindingID": 421},
"image": {"type": "Image", "imageID": 11}
},
"jobResources": {
"scheduleStrategy": "dataLocality",
"clusters": [
{
"clusterID": "1790300942428540928",
"runtime": {"envs": {}, "params": {}},
"code": {"type": "Binding", "bindingID": son_code_Id},
"resources": [
{"type": "CPU", "name": "ARM", "number": task_res["CPU"]},
{"type": "MEMORY", "name": "RAM", "number": task_res["MEMORY"]},
{"type": "MEMORY", "name": "VRAM", "number": 32},
{"type": "STORAGE", "name": "DISK", "number": 32},
{"type": "NPU", "name": "ASCEND910", "number": task_res["NPU"]}
]
}
]
}
},
{"localJobID": "4", "type": "DataReturn", "targetLocalJobID": "1"}
]
}
}
submit_resp = requests.post(submit_url, json=submit_payload, headers=headers, timeout=15).json()
if submit_resp.get("code") != 200:
raise ValueError(f"任务提交失败 | API返回: {submit_resp}")
logger.info(f"[{task_name}] 第九步:任务提交成功 | 任务ID: {submit_resp.get('data', {}).get('jobSetID')}")
# 更新任务状态为成功(线程安全)
with task_map_lock:
task["status"] = "succeed"
return True
except Exception as e:
error_msg = f"提交失败: {str(e)}"
with task_map_lock:
# 检查是否达到最大重试次数
if task["retry_count"] >= task["max_retries"]:
task["status"] = "retry_exhausted"
else:
task["status"] = "failed" # 未达最大次数标记为failed等待重试
task["error_msg"] = error_msg
logger.error(f"[{task_name}] {error_msg}", exc_info=True)
return False
def commit_tasks(available_resources: Dict[str, int], max_workers: int = 3) -> None:
"""提交满足资源条件的任务(并发处理)"""
global task_map
with task_map_lock:
if not task_map:
logger.warning("无待提交任务,退出提交流程")
return
current_tasks = list(task_map.values()) # 复制当前任务列表,避免线程执行中被修改
# 筛选可提交任务状态为submitted且资源满足
eligible_tasks = []
for task in current_tasks:
if task["status"] != "submitted":
logger.info(f"任务 {task['task_name']} 状态为 {task['status']},跳过提交")
continue
task_res = task["resource"]
# 检查CPU、内存、NPU是否均满足
if (task_res["CPU"] <= available_resources["CPU"] and
task_res["MEMORY"] <= available_resources["MEMORY"] and
task_res["NPU"] <= available_resources["NPU"]):
eligible_tasks.append(task)
logger.info(f"任务 {task['task_name']} 资源满足,加入提交队列")
else:
logger.warning(f"任务 {task['task_name']} 资源不足 | 需求: {task_res} | 可用: {available_resources}")
if not eligible_tasks:
logger.info("无满足资源条件的任务,提交流程结束")
return
# 并发提交任务
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(submit_single_task, task): task for task in eligible_tasks}
for future in concurrent.futures.as_completed(futures):
task = futures[future]
try:
future.result() # 触发可能的异常
except Exception as e:
with task_map_lock:
task["status"] = "error"
task["error_msg"] = f"执行异常: {str(e)}"
logger.error(f"任务 {task['task_name']} 执行异常: {str(e)}", exc_info=True)
if __name__ == "__main__":
# 1. 任务静态数据信息
task_templates = generate_tasks()
# 2. 读取任务数据信息队列列表存储到全局task_map包含重试字段
read_tasks(task_templates)
# 3. 创建监控任务状态线程轮询间隔30秒可调整
monitor_thread = TaskMonitorThread(check_interval=30)
monitor_thread.start()
# 4. 创建任务提交线程关联监控线程并发数30
submit_thread = TaskSubmitThread(max_workers=30, monitor_thread=monitor_thread)
submit_thread.start()
# 5. 等待监控线程完成
monitor_thread.join()
submit_thread.join()