SchedulingSimulator/readTasks_queue.py

337 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import concurrent.futures
import time
import queue
import logging
from uuid import uuid4
from typing import Dict, List, Optional
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# 全局任务队列
task_queue = queue.Queue()
def get_token() -> Optional[str]:
"""获取登录Token"""
login_url = "http://119.45.255.234:30180/jcc-admin/admin/login"
login_payload = {"username": "admin", "password": "Nudt@123"}
try:
response = requests.post(login_url, json=login_payload, timeout=10)
response.raise_for_status()
result = response.json()
if result.get("code") == 200 and "data" in result and "token" in result["data"]:
logger.info("✅ 登录成功获取到Token")
return result["data"]["token"]
logger.error(f"❌ 登录失败,响应: {result}")
return None
except requests.RequestException as e:
logger.error(f"❌ 登录请求异常: {e}", exc_info=True)
return None
def generate_tasks() -> List[Dict]:
"""生成任务列表(避免重复代码)"""
tasks = []
# 基础任务模板前3个差异化任务
base_tasks = [
{
"prefix": "AA",
"dataset_name": "data1.zip",
"file_location": "D:/数据集/cnn数据集/data1/"
},
{
"prefix": "AB",
"dataset_name": "cifar-10-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data2/"
},
{
"prefix": "AC",
"dataset_name": "cifar-100-python.tar.gz",
"file_location": "D:/数据集/cnn数据集/data3/"
}
]
# 添加差异化任务
for bt in base_tasks:
task_uuid = uuid4().hex[:8] # 8位唯一ID
tasks.append({
"task_name": f"{bt['prefix']}-octops-trainingtask-{task_uuid}",
"code_Id": 768,
"package_name": f"dataset-v1-{task_uuid}",
"dataset_name": bt["dataset_name"],
"file_location": bt["file_location"],
"CPU": 24,
"MEMORY": 256,
"NPU": 1
})
# 添加18个重复模式的AD任务
ad_count = 18
for _ in range(ad_count):
task_uuid = uuid4().hex[:8]
tasks.append({
"task_name": f"AD-octops-trainingtask-{task_uuid}",
"code_Id": 768,
"package_name": f"dataset-v4-{task_uuid}",
"dataset_name": "data1.zip",
"file_location": "D:/数据集/cnn数据集/data1/",
"CPU": 24,
"MEMORY": 256,
"NPU": 1
})
logger.info(f"📋 生成任务总数: {len(tasks)}")
return tasks
def process_task_queue():
"""处理队列中的任务(可扩展为后续业务逻辑)"""
logger.info("🚀 启动任务队列处理线程")
while True:
try:
# 10秒超时退出避免无限阻塞
task_info = task_queue.get(timeout=10)
logger.info(f"🔄 处理队列任务: {task_info}")
# TODO: 这里添加队列任务的后续处理逻辑
# 例如:提交到调度系统、保存到数据库、发送通知等
task_queue.task_done() # 标记任务完成
except queue.Empty:
logger.info("📦 队列为空,无待处理任务")
break
logger.info("🛑 任务队列处理线程结束")
def main(
token: str,
package_name: str,
dataset_name: str,
file_location: str,
code_Id: int,
task_name: str
):
"""执行单个任务的完整流程"""
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {token}'
}
logger.info(f"📌 开始处理任务: {task_name} (包名: {package_name})")
# ---------------------- 第二步:创建文件夹 ----------------------
try:
create_url = "http://119.45.255.234:30180/jsm/jobSet/createPackage"
create_payload = {
"userID": 5,
"name": package_name,
"dataType": "dataset",
"packageID": 0,
"uploadPriority": {"type": "specify", "clusters": []},
"bindingInfo": {
"clusterIDs": [], "name": package_name, "category": "image",
"type": "dataset", "imageID": "", "bias": [], "region": [],
"chip": ["ASCEND"], "selectedCluster": [], "modelType": "",
"env": "", "version": "", "packageID": 0, "points": 0
}
}
response = requests.post(create_url, json=create_payload, headers=headers, timeout=15)
response.raise_for_status()
result = response.json()
package_id = result['data']['newPackage']['packageID']
logger.info(f"✅ 第二步:创建文件夹成功 (packageID: {package_id})")
except Exception as e:
logger.error(f"❌ 第二步:创建文件夹失败: {e}", exc_info=True)
return
# ---------------------- 第三步:查询数据集列表 ----------------------
try:
query_url = "http://119.45.255.234:30180/jsm/jobSet/queryUploaded"
query_payload = {
"queryParams": {
"dataType": "dataset", "userID": 5, "packageID": package_id,
"path": "", "CurrentPage": 1, "pageSize": 10, "orderBy": "name"
}
}
response = requests.post(query_url, json=query_payload, headers=headers, timeout=15)
response.raise_for_status()
logger.info("✅ 第三步:查询数据集列表成功")
except Exception as e:
logger.error(f"❌ 第三步:查询数据集列表失败: {e}", exc_info=True)
return
# ---------------------- 第四步:文件上传 ----------------------
try:
upload_url = "http://121.36.5.116:32010/object/upload"
file_path = f"{file_location}{dataset_name}"
info_data = {
"userID": 5, "packageID": package_id, "loadTo": [],
"loadToPath": [f"/dataset/5/{package_name}/"]
}
form_data = {
"info": (None, json.dumps(info_data)),
"files": open(file_path, 'rb')
}
response = requests.post(
upload_url,
files=form_data,
headers={'Authorization': f'Bearer {token}'},
timeout=300 # 大文件上传超时设为5分钟
)
response.raise_for_status()
result = response.json()["data"]["uploadeds"][0]
object_id = result["objectID"]
logger.info(f"✅ 第四步:文件上传成功 (objectID: {object_id})")
except FileNotFoundError:
logger.error(f"❌ 第四步:文件不存在: {file_path}")
return
except Exception as e:
logger.error(f"❌ 第四步:文件上传失败: {e}", exc_info=True)
return
# ---------------------- 第五步:通知上传完成 ----------------------
try:
notify_url = "http://119.45.255.234:30180/jsm/jobSet/notifyUploaded"
notify_payload = {
"userID": 5, "packageID": package_id,
"uploadParams": {
"dataType": "dataset",
"uploadInfo": {"type": "local", "localPath": dataset_name, "objectIDs": [object_id]}
}
}
response = requests.post(notify_url, json=notify_payload, headers=headers, timeout=15)
response.raise_for_status()
logger.info("✅ 第五步:通知上传完成成功")
except Exception as e:
logger.error(f"❌ 第五步:通知上传完成失败: {e}", exc_info=True)
return
# ---------------------- 第六步:二次查询上传结果 ----------------------
try:
query_url = "http://119.45.255.234:30180/jsm/jobSet/queryUploaded"
query_payload = {
"queryParams": {
"dataType": "dataset", "userID": 5, "packageID": package_id,
"path": "", "CurrentPage": 1, "pageSize": 10, "orderBy": "name"
}
}
response = requests.post(query_url, json=query_payload, headers=headers, timeout=15)
response.raise_for_status()
logger.info("✅ 第六步:二次查询上传结果成功")
except Exception as e:
logger.error(f"❌ 第六步:二次查询上传结果失败: {e}", exc_info=True)
return
# ---------------------- 第七步:查询上传状态 ----------------------
try:
status_url = "http://119.45.255.234:30180/jsm/jobSet/uploadStatus"
status_payload = {
"userID": 5, "operate": "query", "packageID": package_id, "dataType": "dataset"
}
response = requests.post(status_url, json=status_payload, headers=headers, timeout=15)
response.raise_for_status()
logger.info("✅ 第七步:查询上传状态成功")
except Exception as e:
logger.error(f"❌ 第七步:查询上传状态失败: {e}", exc_info=True)
return
# ---------------------- 第八步:绑定集群 ----------------------
try:
binding_url = "http://119.45.255.234:30180/jsm/jobSet/binding"
binding_payload = {
"userID": 5,
"info": {"type": "dataset", "packageID": package_id, "clusterIDs": []}
}
response = requests.post(binding_url, json=binding_payload, headers=headers, timeout=15)
response.raise_for_status()
logger.info("✅ 第八步:绑定集群成功")
except Exception as e:
logger.error(f"❌ 第八步:绑定集群失败: {e}", exc_info=True)
return
# ---------------------- 第九步:查询绑定结果 ----------------------
target_id = None
try:
query_binding_url = "http://119.45.255.234:30180/jsm/jobSet/queryBinding"
query_binding_payload = {
"dataType": "dataset",
"param": {"userID": 5, "bindingID": -1, "type": "private"}
}
response = requests.post(query_binding_url, json=query_binding_payload, headers=headers, timeout=15)
response.raise_for_status()
result = response.json()
# 提取目标ID
for data in result.get("data", {}).get("datas", []):
if data.get("info", {}).get("name") == package_name:
target_id = data.get("ID")
break
if target_id:
logger.info(f"✅ 第九步:查询绑定结果成功 (target_id: {target_id})")
else:
logger.warning(f"⚠️ 第九步:未找到 {package_name} 对应的绑定ID")
return
except Exception as e:
logger.error(f"❌ 第九步:查询绑定结果失败: {e}", exc_info=True)
return
# ---------------------- 第九步后:添加任务到队列 ----------------------
task_info = {
"task_name": task_name,
"package_name": package_name,
"dataset_name": dataset_name,
"code_Id": code_Id,
"target_id": target_id,
"status": "submitted",
"timestamp": time.time()
}
task_queue.put(task_info)
logger.info(f"📥 任务 {task_name} 已加入队列 (队列大小: {task_queue.qsize()})")
if __name__ == "__main__":
# 1. 获取Token
token = get_token()
if not token:
logger.error("💥 无法获取Token程序退出")
exit(1)
# 2. 生成任务列表
tasks = generate_tasks()
# 3. 并发执行任务控制并发数为5
max_workers = 5
logger.info(f"🔧 启动线程池,并发数: {max_workers}")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(
main,
token=token,
package_name=task["package_name"],
dataset_name=task["dataset_name"],
file_location=task["file_location"],
code_Id=task["code_Id"],
task_name=task["task_name"]
) for task in tasks
]
# 等待所有任务完成并捕获异常
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"💥 任务执行异常: {e}", exc_info=True)
# 4. 处理队列中的任务
process_task_queue()
logger.info(" 所有任务处理完毕")