对上传文件进行预处理,增加MIME类型的model模型,增加发送邮件及获取测试结果的数据校验

This commit is contained in:
wangjie 2023-12-01 12:16:16 +08:00
parent bbefb620ec
commit d0f07a67c4
7 changed files with 164 additions and 62 deletions

View File

@ -6,8 +6,9 @@
# @project : SensoroApi # @project : SensoroApi
import base64 import base64
import json import json
import os
import time import time
from typing import Optional, Tuple, Any from typing import Optional, Tuple, Any, Union, Dict
import allure import allure
import py3curl import py3curl
@ -16,10 +17,11 @@ from requests import PreparedRequest
from requests.structures import CaseInsensitiveDict from requests.structures import CaseInsensitiveDict
from common.base_log import logger from common.base_log import logger
from common.exceptions import ValueNotFoundError, ValueTypeError from common.exceptions import ValueTypeError
from common.models import Method from common.models import Method
from configs.lins_environment import EntryPoint from configs.lins_environment import EntryPoint
from utils.allure_handle import allure_attach_text, allure_attach_json from utils.MIME_type_classifier import get_MIME
from utils.allure_handle import allure_attach_text, allure_attach_json, allure_attach_file
from utils.time_utils import TimeUtil from utils.time_utils import TimeUtil
@ -71,6 +73,34 @@ class BaseApi:
} }
return params return params
@staticmethod
def _make_files(files_info: Union[str, Dict[str, str]]) -> Dict[str, Tuple[str, Any, str]]:
"""
对上传文件进行预处理
:param files_info: 支持str和dict两种传参方式str时只需要传文件名即可该文件字段名默认为file如果后端要求字段名不是file可以字典的方式传入k是字段名v是文件路径{"file":'/Users/wangjie/Desktop/111.png'}
:return:
"""
if files_info is None:
return {}
# 类型检查
if not isinstance(files_info, (str, dict)):
raise TypeError("files_info必须是字符串或字典")
# 如果传入的是单个文件路径转换为包含该路径的字典并带上默认字段名file
if isinstance(files_info, str):
files_info = {'file': files_info}
# 准备上传文件的数据
files = {}
for field_name, file_path in files_info.items():
if not isinstance(field_name, str) or not isinstance(file_path, str):
raise TypeError("files_info字典中的每个条目必须是一个字符串键和一个字符串值")
file_name = os.path.basename(file_path)
files[field_name] = (file_name, open(file_path, 'rb'), get_MIME(file_path))
with allure.step("上传的附件"):
allure_attach_file(file_name, file_path)
return files
@staticmethod @staticmethod
def request(method, address, headers=None, params=None, data=None, json_data=None, files=None) -> requests.Response: def request(method, address, headers=None, params=None, data=None, json_data=None, files=None) -> requests.Response:
"""发送http请求返回response对象""" """发送http请求返回response对象"""
@ -78,6 +108,7 @@ class BaseApi:
url = BaseApi._make_url(address) url = BaseApi._make_url(address)
headers = BaseApi._make_headers(headers) headers = BaseApi._make_headers(headers)
method = BaseApi._make_method(method) method = BaseApi._make_method(method)
files = BaseApi._make_files(files)
# 发送请求 # 发送请求
try: try:
@ -93,7 +124,7 @@ class BaseApi:
r_headers = response.request.headers r_headers = response.request.headers
r_body = BaseApi.get_request_body(response) r_body = BaseApi.get_request_body(response)
r_curl = BaseApi.request_to_curl(response) r_curl = BaseApi.request_to_curl(response)
r_respone = response.json() r_respone = BaseApi.get_json(response)
r_duration = duration r_duration = duration
r_respone_status_code = response.status_code r_respone_status_code = response.status_code
r_respone_headers = response.headers r_respone_headers = response.headers
@ -154,13 +185,14 @@ class BaseApi:
files=files) files=files)
@staticmethod @staticmethod
def get_json(response: requests.Response) -> json: def get_json(response: requests.Response) -> Union[Dict[str, Any], str]:
"""获取响应结果的json格式""" """获取响应结果的json格式"""
if response: try:
json_data = response.json() json_data = response.json()
return json_data return json_data
else: except json.JSONDecodeError:
raise ValueNotFoundError('请求返回结果为空,无法获取响应') # 如果json解析失败则返回原始响应体文本
return f'解码JSON失败或响应为空,返回原始响应:{response.text}'
@staticmethod @staticmethod
def get_text(response: requests.Response) -> str: def get_text(response: requests.Response) -> str:

View File

@ -11,61 +11,27 @@ from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText from email.mime.text import MIMEText
from common.base_log import logger from common.base_log import logger
from common.exceptions import SendMessageError from common.exceptions import SendMessageError
from common.models import Email
from utils.MIME_type_classifier import get_MIME
class MailSender: class MailSender:
"""发送邮件功能""" """发送邮件功能"""
def __init__(self, mail_subject, sender_username, sender_password, receiver_mail_list, def __init__(self, email_data: dict):
smtp_domain, smtp_port): email_data = Email(**email_data)
self.subject = mail_subject # 邮件标题 self.subject = email_data.mail_subject # 邮件标题
self.sender_username = sender_username # 发件人邮箱 self.sender_username = email_data.sender_username # 发件人邮箱
self.sender_password = sender_password # 发件人邮箱授权码 self.sender_password = email_data.sender_password # 发件人邮箱授权码
self.receiver_mail_list = receiver_mail_list # 收件人邮箱 self.receiver_mail_list = email_data.receiver_mail_list # 收件人邮箱
self.smtp_domain = smtp_domain # 发送邮箱的域名 self.smtp_domain = email_data.smtp_domain # 发送邮箱的域名
self.smtp_port = smtp_port # 发送邮箱的端口号 self.smtp_port = email_data.smtp_port # 发送邮箱的端口号
self.message = MIMEMultipart() self.message = MIMEMultipart()
self.message['From'] = Header(self.sender_username, 'utf-8') self.message['From'] = Header(self.sender_username, 'utf-8')
self.message['To'] = ';'.join(self.receiver_mail_list) self.message['To'] = ';'.join(self.receiver_mail_list)
self.message['subject'] = Header(self.subject, 'utf-8') self.message['subject'] = Header(self.subject, 'utf-8')
def get_MIME(self, file_name):
"""多用途互联网邮件扩展类型,根据发送文件的后缀匹配对应的类型"""
d = {
'image/png': ['png'],
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': ['xlsx'],
'application/vnd.openxmlformats-officedocument.presentationml.presentation': ['pptx'],
'application/pdf': ['pdf'],
'image/jpeg': ['jpg', 'jpeg'],
'application/zip': ['zip'],
'text/plain': ['txt'],
'video/mp4': ['mp4'],
'application/msword': ['doc', 'dot'],
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': ['docx'],
'application/vnd.openxmlformats-officedocument.wordprocessingml.template': ['dotx'],
'application/vnd.ms-word.document.macroEnabled.12': ['docm'],
'application/vnd.ms-word.template.macroEnabled.12': ['dotm'],
'application/vnd.ms-excel': ['xls', 'xlt', 'xla'],
'application/vnd.openxmlformats-officedocument.spreadsheetml.template': ['xltx'],
'application/vnd.ms-excel.sheet.macroEnabled.12': ['xlsm'],
'application/vnd.ms-excel.template.macroEnabled.12': ['xltm'],
'application/vnd.ms-excel.addin.macroEnabled.12': ['xlam'],
'application/vnd.ms-excel.sheet.binary.macroEnabled.12': ['xlsb'],
'application/vnd.ms-powerpoint': ['ppt', 'pot', 'pps', 'ppa'],
'application/vnd.openxmlformats-officedocument.presentationml.slideshow': ['ppsx'],
'application/vnd.ms-powerpoint.addin.macroEnabled.12': ['ppam'],
'application/vnd.ms-powerpoint.presentation.macroEnabled.12': ['pptm', 'potm'],
'application/vnd.ms-powerpoint.slideshow.macroEnabled.12': ['ppsm'],
'application/x-tar': ['tar'],
}
# 获取文件后缀
hz = file_name.split('.')[-1]
for key, value in d.items():
if hz in value:
return key
return 'application/octet-stream' # 一切未知类型
def attach_text(self, content): def attach_text(self, content):
"""添加邮件正文内容""" """添加邮件正文内容"""
self.message.attach(MIMEText(content, 'html', 'utf-8')) self.message.attach(MIMEText(content, 'html', 'utf-8'))
@ -75,7 +41,7 @@ class MailSender:
"""添加附件""" """添加附件"""
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
mime_app = MIMEApplication(f.read()) mime_app = MIMEApplication(f.read())
mime_app['Content-Type'] = self.get_MIME(file_path) mime_app['Content-Type'] = get_MIME(file_path)
mime_app.add_header('content-disposition', 'attachment', filename='reporter.html') mime_app.add_header('content-disposition', 'attachment', filename='reporter.html')
self.message.attach(mime_app) self.message.attach(mime_app)
return self return self
@ -98,6 +64,13 @@ class MailSender:
if __name__ == '__main__': if __name__ == '__main__':
MailSender('测试邮件发送', 'xxxxx@xxxx.com', 'xxxxxxx', email_config = {
['xxxxx@xxxx.com'], 'smtp.exmail.qq.com', 465).attach_text( 'mail_subject': '接口自动化测试报告', # 邮件标题
'测试邮件').attach_file('/Users/wangjie/SensoroApi/outFiles/pytest_report/report.html').send() 'sender_username': '1231323@qq.com', # 发件人邮箱
'sender_password': 'xxxxxxxxx', # 发件人邮箱授权码
'receiver_mail_list': ['1231323@qq.com', ], # 收件人邮箱
'smtp_domain': 'smtp.exmail.qq.com', # 发送邮箱的域名
'smtp_port': 465, # 发送邮箱的端口号
}
MailSender(email_config).attach_text('测试邮件').attach_file(
'/Users/wangjie/SensoroApiAutoTest/outFiles/pytest_report/pytest_report.html').send()

View File

@ -6,7 +6,9 @@
# @project : SensoroApi # @project : SensoroApi
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, unique from enum import Enum, unique
from typing import Text from typing import Text, List, Union
from pydantic import BaseModel
class Environment(Enum): class Environment(Enum):
@ -74,6 +76,43 @@ class AllureAttachmentType(Enum):
PDF = "pdf" PDF = "pdf"
class Email(BaseModel):
mail_subject: Union[Text, None] = None # 邮件标题
sender_username: Text # 发件人邮箱
sender_password: Text # 发件人邮箱授权码
receiver_mail_list: List[Text] # 收件人邮箱
smtp_domain: Text # 发送邮箱的域名
smtp_port: Union[int, None] = None # 发送邮箱的端口号
class MIMEFileType(Enum):
PNG = ('image/png', ['png'])
XLSX = ('application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', ['xlsx'])
PPTX = ('application/vnd.openxmlformats-officedocument.presentationml.presentation', ['pptx'])
PDF = ('application/pdf', ['pdf'])
JPG = ('image/jpeg', ['jpg', 'jpeg'])
ZIP = ('application/zip', ['zip'])
TXT = ('text/plain', ['txt'])
MP4 = ('video/mp4', ['mp4'])
DOC = ('application/msword', ['doc', 'dot'])
DOCX = ('application/vnd.openxmlformats-officedocument.wordprocessingml.document', ['docx'])
DOTX = ('application/vnd.openxmlformats-officedocument.wordprocessingml.template', ['dotx'])
DOCM = ('application/vnd.ms-word.document.macroEnabled.12', ['docm'])
DOTM = ('application/vnd.ms-word.template.macroEnabled.12', ['dotm'])
XLS = ('application/vnd.ms-excel', ['xls', 'xlt', 'xla'])
XLTX = ('application/vnd.openxmlformats-officedocument.spreadsheetml.template', ['xltx'])
XLSM = ('application/vnd.ms-excel.sheet.macroEnabled.12', ['xlsm'])
XLTM = ('application/vnd.ms-excel.template.macroEnabled.12', ['xltm'])
XLAM = ('application/vnd.ms-excel.addin.macroEnabled.12', ['xlam'])
XLSB = ('application/vnd.ms-excel.sheet.binary.macroEnabled.12', ['xlsb'])
PPT = ('application/vnd.ms-powerpoint', ['ppt', 'pot', 'pps', 'ppa'])
PPSX = ('application/vnd.openxmlformats-officedocument.presentationml.slideshow', ['ppsx'])
PPAM = ('application/vnd.ms-powerpoint.addin.macroEnabled.12', ['ppam'])
PPTM = ('application/vnd.ms-powerpoint.presentation.macroEnabled.12', ['pptm', 'potm'])
PPSM = ('application/vnd.ms-powerpoint.slideshow.macroEnabled.12', ['ppsm'])
TAR = ('application/x-tar', ['tar'])
if __name__ == '__main__': if __name__ == '__main__':
print(Environment.DEV.name) print(Environment.DEV.name)
print(Environment.DEV.value) print(Environment.DEV.value)

View File

@ -78,7 +78,7 @@ def pytest_runtest_makereport(item, call): # description取值为用例说明__
def pytest_terminal_summary(terminalreporter, exitstatus, config): def pytest_terminal_summary(terminalreporter, exitstatus, config):
"""收集测试结果展示在控制台""" """收集测试结果展示在控制台"""
pytest_result = TestMetrics(**ReportDataHandle.pytest_json_report_case_count()) pytest_result = ReportDataHandle.pytest_json_report_case_count()
run_time = round((time.time() - terminalreporter._sessionstarttime), 2) run_time = round((time.time() - terminalreporter._sessionstarttime), 2)
print("******用例执行结果统计******") print("******用例执行结果统计******")
print(f"总用例数:{pytest_result.total}") print(f"总用例数:{pytest_result.total}")

5
run.py
View File

@ -15,7 +15,6 @@ from dataclasses import asdict
import pytest import pytest
from common.base_log import logger from common.base_log import logger
from common.models import TestMetrics
from utils.allure_handle import AllureReportBeautiful from utils.allure_handle import AllureReportBeautiful
from utils.command_parser import command_parser from utils.command_parser import command_parser
from common.mail_sender import MailSender from common.mail_sender import MailSender
@ -77,7 +76,7 @@ if __name__ == '__main__':
# ------------------------------发送通知消息---------------------------------- # ------------------------------发送通知消息----------------------------------
# 发送企业微信群聊 # 发送企业微信群聊
pytest_result = asdict(TestMetrics(**ReportDataHandle.pytest_json_report_case_count())) pytest_result = asdict(ReportDataHandle.pytest_json_report_case_count())
if IS_SEND_WECHAT: # 判断是否需要发送企业微信 if IS_SEND_WECHAT: # 判断是否需要发送企业微信
EnterpriseWechatNotification(wechat_webhook_url).send_markdown( EnterpriseWechatNotification(wechat_webhook_url).send_markdown(
DataProcessor().process_data(wechat_content, pytest_result)) DataProcessor().process_data(wechat_content, pytest_result))
@ -85,5 +84,5 @@ if __name__ == '__main__':
# 发送邮件 # 发送邮件
if IS_SEND_EMAIL: # 判断是否需要发送邮件 if IS_SEND_EMAIL: # 判断是否需要发送邮件
file_path = PYTEST_REPORT_DIR + os.sep + 'pytest_report.html' file_path = PYTEST_REPORT_DIR + os.sep + 'pytest_report.html'
ms = MailSender(**email_config) ms = MailSender(email_config)
ms.attach_text(DataProcessor().process_data(email_content, pytest_result)).attach_file(file_path).send() ms.attach_text(DataProcessor().process_data(email_content, pytest_result)).attach_file(file_path).send()

View File

@ -0,0 +1,58 @@
# !/usr/bin/python
# -*- coding:utf-8 -*-
# @Time : 2023/11/30 16:57
# @Author : wangjie
# @File : MIME_type_classifier.py
# @project : SensoroApiAutoTest
from common.models import MIMEFileType
d = {
'image/png': ['png'],
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': ['xlsx'],
'application/vnd.openxmlformats-officedocument.presentationml.presentation': ['pptx'],
'application/pdf': ['pdf'],
'image/jpeg': ['jpg', 'jpeg'],
'application/zip': ['zip'],
'text/plain': ['txt'],
'video/mp4': ['mp4'],
'application/msword': ['doc', 'dot'],
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': ['docx'],
'application/vnd.openxmlformats-officedocument.wordprocessingml.template': ['dotx'],
'application/vnd.ms-word.document.macroEnabled.12': ['docm'],
'application/vnd.ms-word.template.macroEnabled.12': ['dotm'],
'application/vnd.ms-excel': ['xls', 'xlt', 'xla'],
'application/vnd.openxmlformats-officedocument.spreadsheetml.template': ['xltx'],
'application/vnd.ms-excel.sheet.macroEnabled.12': ['xlsm'],
'application/vnd.ms-excel.template.macroEnabled.12': ['xltm'],
'application/vnd.ms-excel.addin.macroEnabled.12': ['xlam'],
'application/vnd.ms-excel.sheet.binary.macroEnabled.12': ['xlsb'],
'application/vnd.ms-powerpoint': ['ppt', 'pot', 'pps', 'ppa'],
'application/vnd.openxmlformats-officedocument.presentationml.slideshow': ['ppsx'],
'application/vnd.ms-powerpoint.addin.macroEnabled.12': ['ppam'],
'application/vnd.ms-powerpoint.presentation.macroEnabled.12': ['pptm', 'potm'],
'application/vnd.ms-powerpoint.slideshow.macroEnabled.12': ['ppsm'],
'application/x-tar': ['tar'],
}
def get_MIME(file_name):
"""
多用途互联网邮件扩展类型,根据文件后缀匹配文件类型并返回相应的 MIME 类型
:param file_name:文件名或者文件路径
:return:对应的 MIME 类型
"""
extension = file_name.split('.')[-1].lower()
for file_type in MIMEFileType:
if extension in file_type.value[1]:
return file_type.value[0]
return 'application/octet-stream' # 一切未知类型
if __name__ == '__main__':
mime_type = get_MIME('example.png')
print(mime_type)

View File

@ -7,6 +7,7 @@
import json import json
import os import os
from common.models import TestMetrics
from configs.dir_path_config import PYTEST_RESULT_DIR from configs.dir_path_config import PYTEST_RESULT_DIR
from utils.time_utils import TimeUtil from utils.time_utils import TimeUtil
@ -43,7 +44,7 @@ class ReportDataHandle:
case_count["duration"] = round(pytest_result["duration"], 2) # 用例运行时间 case_count["duration"] = round(pytest_result["duration"], 2) # 用例运行时间
case_count["start_time"] = TimeUtil.unix_time_to_str(int('%.f' % pytest_result["created"])) # 用例开始时间 case_count["start_time"] = TimeUtil.unix_time_to_str(int('%.f' % pytest_result["created"])) # 用例开始时间
return case_count return TestMetrics(**case_count)
if __name__ == '__main__': if __name__ == '__main__':