From 658b668bb5497ddc2d13be87baec8f56776f059b Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Mon, 21 Jul 2025 18:58:30 +0800 Subject: [PATCH] Delete readTasks_queue.py --- readTasks_queue.py | 337 --------------------------------------------- 1 file changed, 337 deletions(-) delete mode 100644 readTasks_queue.py diff --git a/readTasks_queue.py b/readTasks_queue.py deleted file mode 100644 index 33eba5c..0000000 --- a/readTasks_queue.py +++ /dev/null @@ -1,337 +0,0 @@ -import requests -import json -import concurrent.futures -import time -import queue -import logging -from uuid import uuid4 -from typing import Dict, List, Optional - -# 日志配置 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[logging.StreamHandler()] -) -logger = logging.getLogger(__name__) - -# 全局任务队列 -task_queue = queue.Queue() - - -def get_token() -> Optional[str]: - """获取登录Token""" - login_url = "http://119.45.255.234:30180/jcc-admin/admin/login" - login_payload = {"username": "admin", "password": "Nudt@123"} - try: - response = requests.post(login_url, json=login_payload, timeout=10) - response.raise_for_status() - result = response.json() - - if result.get("code") == 200 and "data" in result and "token" in result["data"]: - logger.info("✅ 登录成功,获取到Token") - return result["data"]["token"] - logger.error(f"❌ 登录失败,响应: {result}") - return None - except requests.RequestException as e: - logger.error(f"❌ 登录请求异常: {e}", exc_info=True) - return None - - -def generate_tasks() -> List[Dict]: - """生成任务列表(避免重复代码)""" - tasks = [] - # 基础任务模板(前3个差异化任务) - base_tasks = [ - { - "prefix": "AA", - "dataset_name": "data1.zip", - "file_location": "D:/数据集/cnn数据集/data1/" - }, - { - "prefix": "AB", - "dataset_name": "cifar-10-python.tar.gz", - "file_location": "D:/数据集/cnn数据集/data2/" - }, - { - "prefix": "AC", - "dataset_name": "cifar-100-python.tar.gz", - "file_location": "D:/数据集/cnn数据集/data3/" - } - ] - - # 添加差异化任务 - for bt in base_tasks: - task_uuid = uuid4().hex[:8] # 8位唯一ID - tasks.append({ - "task_name": f"{bt['prefix']}-octops-trainingtask-{task_uuid}", - "code_Id": 768, - "package_name": f"dataset-v1-{task_uuid}", - "dataset_name": bt["dataset_name"], - "file_location": bt["file_location"], - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }) - - # 添加18个重复模式的AD任务 - ad_count = 18 - for _ in range(ad_count): - task_uuid = uuid4().hex[:8] - tasks.append({ - "task_name": f"AD-octops-trainingtask-{task_uuid}", - "code_Id": 768, - "package_name": f"dataset-v4-{task_uuid}", - "dataset_name": "data1.zip", - "file_location": "D:/数据集/cnn数据集/data1/", - "CPU": 24, - "MEMORY": 256, - "NPU": 1 - }) - - logger.info(f"📋 生成任务总数: {len(tasks)}") - return tasks - - -def process_task_queue(): - """处理队列中的任务(可扩展为后续业务逻辑)""" - logger.info("🚀 启动任务队列处理线程") - while True: - try: - # 10秒超时退出,避免无限阻塞 - task_info = task_queue.get(timeout=10) - logger.info(f"🔄 处理队列任务: {task_info}") - - # TODO: 这里添加队列任务的后续处理逻辑 - # 例如:提交到调度系统、保存到数据库、发送通知等 - - task_queue.task_done() # 标记任务完成 - except queue.Empty: - logger.info("📦 队列为空,无待处理任务") - break - logger.info("🛑 任务队列处理线程结束") - - -def main( - token: str, - package_name: str, - dataset_name: str, - file_location: str, - code_Id: int, - task_name: str -): - """执行单个任务的完整流程""" - headers = { - 'Content-Type': 'application/json', - 'Authorization': f'Bearer {token}' - } - logger.info(f"📌 开始处理任务: {task_name} (包名: {package_name})") - - # ---------------------- 第二步:创建文件夹 ---------------------- - try: - create_url = "http://119.45.255.234:30180/jsm/jobSet/createPackage" - create_payload = { - "userID": 5, - "name": package_name, - "dataType": "dataset", - "packageID": 0, - "uploadPriority": {"type": "specify", "clusters": []}, - "bindingInfo": { - "clusterIDs": [], "name": package_name, "category": "image", - "type": "dataset", "imageID": "", "bias": [], "region": [], - "chip": ["ASCEND"], "selectedCluster": [], "modelType": "", - "env": "", "version": "", "packageID": 0, "points": 0 - } - } - response = requests.post(create_url, json=create_payload, headers=headers, timeout=15) - response.raise_for_status() - result = response.json() - package_id = result['data']['newPackage']['packageID'] - logger.info(f"✅ 第二步:创建文件夹成功 (packageID: {package_id})") - except Exception as e: - logger.error(f"❌ 第二步:创建文件夹失败: {e}", exc_info=True) - return - - # ---------------------- 第三步:查询数据集列表 ---------------------- - try: - query_url = "http://119.45.255.234:30180/jsm/jobSet/queryUploaded" - query_payload = { - "queryParams": { - "dataType": "dataset", "userID": 5, "packageID": package_id, - "path": "", "CurrentPage": 1, "pageSize": 10, "orderBy": "name" - } - } - response = requests.post(query_url, json=query_payload, headers=headers, timeout=15) - response.raise_for_status() - logger.info("✅ 第三步:查询数据集列表成功") - except Exception as e: - logger.error(f"❌ 第三步:查询数据集列表失败: {e}", exc_info=True) - return - - # ---------------------- 第四步:文件上传 ---------------------- - try: - upload_url = "http://121.36.5.116:32010/object/upload" - file_path = f"{file_location}{dataset_name}" - info_data = { - "userID": 5, "packageID": package_id, "loadTo": [], - "loadToPath": [f"/dataset/5/{package_name}/"] - } - form_data = { - "info": (None, json.dumps(info_data)), - "files": open(file_path, 'rb') - } - response = requests.post( - upload_url, - files=form_data, - headers={'Authorization': f'Bearer {token}'}, - timeout=300 # 大文件上传超时设为5分钟 - ) - response.raise_for_status() - result = response.json()["data"]["uploadeds"][0] - object_id = result["objectID"] - logger.info(f"✅ 第四步:文件上传成功 (objectID: {object_id})") - except FileNotFoundError: - logger.error(f"❌ 第四步:文件不存在: {file_path}") - return - except Exception as e: - logger.error(f"❌ 第四步:文件上传失败: {e}", exc_info=True) - return - - # ---------------------- 第五步:通知上传完成 ---------------------- - try: - notify_url = "http://119.45.255.234:30180/jsm/jobSet/notifyUploaded" - notify_payload = { - "userID": 5, "packageID": package_id, - "uploadParams": { - "dataType": "dataset", - "uploadInfo": {"type": "local", "localPath": dataset_name, "objectIDs": [object_id]} - } - } - response = requests.post(notify_url, json=notify_payload, headers=headers, timeout=15) - response.raise_for_status() - logger.info("✅ 第五步:通知上传完成成功") - except Exception as e: - logger.error(f"❌ 第五步:通知上传完成失败: {e}", exc_info=True) - return - - # ---------------------- 第六步:二次查询上传结果 ---------------------- - try: - query_url = "http://119.45.255.234:30180/jsm/jobSet/queryUploaded" - query_payload = { - "queryParams": { - "dataType": "dataset", "userID": 5, "packageID": package_id, - "path": "", "CurrentPage": 1, "pageSize": 10, "orderBy": "name" - } - } - response = requests.post(query_url, json=query_payload, headers=headers, timeout=15) - response.raise_for_status() - logger.info("✅ 第六步:二次查询上传结果成功") - except Exception as e: - logger.error(f"❌ 第六步:二次查询上传结果失败: {e}", exc_info=True) - return - - # ---------------------- 第七步:查询上传状态 ---------------------- - try: - status_url = "http://119.45.255.234:30180/jsm/jobSet/uploadStatus" - status_payload = { - "userID": 5, "operate": "query", "packageID": package_id, "dataType": "dataset" - } - response = requests.post(status_url, json=status_payload, headers=headers, timeout=15) - response.raise_for_status() - logger.info("✅ 第七步:查询上传状态成功") - except Exception as e: - logger.error(f"❌ 第七步:查询上传状态失败: {e}", exc_info=True) - return - - # ---------------------- 第八步:绑定集群 ---------------------- - try: - binding_url = "http://119.45.255.234:30180/jsm/jobSet/binding" - binding_payload = { - "userID": 5, - "info": {"type": "dataset", "packageID": package_id, "clusterIDs": []} - } - response = requests.post(binding_url, json=binding_payload, headers=headers, timeout=15) - response.raise_for_status() - logger.info("✅ 第八步:绑定集群成功") - except Exception as e: - logger.error(f"❌ 第八步:绑定集群失败: {e}", exc_info=True) - return - - # ---------------------- 第九步:查询绑定结果 ---------------------- - target_id = None - try: - query_binding_url = "http://119.45.255.234:30180/jsm/jobSet/queryBinding" - query_binding_payload = { - "dataType": "dataset", - "param": {"userID": 5, "bindingID": -1, "type": "private"} - } - response = requests.post(query_binding_url, json=query_binding_payload, headers=headers, timeout=15) - response.raise_for_status() - result = response.json() - - # 提取目标ID - for data in result.get("data", {}).get("datas", []): - if data.get("info", {}).get("name") == package_name: - target_id = data.get("ID") - break - - if target_id: - logger.info(f"✅ 第九步:查询绑定结果成功 (target_id: {target_id})") - else: - logger.warning(f"⚠️ 第九步:未找到 {package_name} 对应的绑定ID") - return - except Exception as e: - logger.error(f"❌ 第九步:查询绑定结果失败: {e}", exc_info=True) - return - - # ---------------------- 第九步后:添加任务到队列 ---------------------- - task_info = { - "task_name": task_name, - "package_name": package_name, - "dataset_name": dataset_name, - "code_Id": code_Id, - "target_id": target_id, - "status": "submitted", - "timestamp": time.time() - } - task_queue.put(task_info) - logger.info(f"📥 任务 {task_name} 已加入队列 (队列大小: {task_queue.qsize()})") - - -if __name__ == "__main__": - # 1. 获取Token - token = get_token() - if not token: - logger.error("💥 无法获取Token,程序退出") - exit(1) - - # 2. 生成任务列表 - tasks = generate_tasks() - - # 3. 并发执行任务(控制并发数为5) - max_workers = 5 - logger.info(f"🔧 启动线程池,并发数: {max_workers}") - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [ - executor.submit( - main, - token=token, - package_name=task["package_name"], - dataset_name=task["dataset_name"], - file_location=task["file_location"], - code_Id=task["code_Id"], - task_name=task["task_name"] - ) for task in tasks - ] - - # 等待所有任务完成并捕获异常 - for future in concurrent.futures.as_completed(futures): - try: - future.result() - except Exception as e: - logger.error(f"💥 任务执行异常: {e}", exc_info=True) - - # 4. 处理队列中的任务 - process_task_queue() - - logger.info(" 所有任务处理完毕") \ No newline at end of file