dupPRdetect/detect_trustie.py

193 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from re import T
import requests
import json
import jieba
import difflib
from fuzzywuzzy import fuzz
import numpy as np
from collections import Counter
from tqdm import tqdm
import math
import redis
token = ""
r = redis.StrictRedis(host="localhost", port=6379, db=0, decode_responses=True)
def crawl_api(url):
header = {"Authorization": "token %s" % token}
response = requests.get(url, headers=header, stream=True)
r = response.json()
if "status" in r.keys():
if r['status'] == -1:
return None
else:
pass
return r
def crawl_singel_pr(url):
header = {"Authorization": "token %s" % token}
response = requests.get(url, headers=header, stream=True)
r = response.json()
if "status" in r.keys():
if r['status'] == -1:
return None
else:
pass
return r
def crawl_file_to_pr(all_pr_info, repo, url_head):
print("----------Obtain files for each pull request--------------")
for pr in tqdm(all_pr_info["issues"]):
file_url = url_head[0] + "//" + url_head[2] + "/api/" +repo+"/pulls/"+str(pr["pull_request_id"])+"/files.json"
header = {"Authorization": "token %s" % token}
response = requests.get(file_url, headers=header, stream=True)
r = response.json()
pr["files"] = r
# print(pr)
return all_pr_info
def crawl_all_pr(url):
header = {"Authorization": "token %s" % token}
response = requests.get(url, stream=True)
r = response.json()
r["issues"] = []
if "status" in r.keys(): # 判断该pr是否存在
if r['status'] == -1:
return None
else:
pass
page = math.ceil(r["search_count"]/20) # 20是trustie每页pr的上限
for p in range(page): # 逐页获取pr的列表
url_p = url+"?page=%s&limit=20" % str(p+1)
response_p = requests.get(url_p, headers=header, stream=True)
r_p = response_p.json()
r["issues"] += r_p["issues"]
return r
def detect_text_file_dup(all_pr_info, project_key, pr_num):
print(r.get(project_key + "_" + str(pr_num)))
current_pr_value = json.loads(r.get(project_key + "_" + str(pr_num)))
current_word = jieba.lcut(current_pr_value["title"] + " " + current_pr_value['description'])
current_word = [x.strip() for x in current_word if x.strip() != '']
all_score = []
for pr in all_pr_info["issues"]:
score = {}
score["pr_number"] = str(pr["pull_request_id"])
project_pr_key = project_key + "_" + str(pr["pull_request_id"])
value = json.loads(r.get(project_pr_key)) # 取redis数据库的缓存数据
# 计算文本相似度
word = jieba.lcut(value["title"] + " " + value['description'])
word = [x.strip() for x in word if x.strip() != '']
score["text_score"] = '%.1f' % (difflib.SequenceMatcher(None, current_word, word).ratio() * 100) # difflib计算分值
# 计算文件相似度
score["file_score"] = '%.1f' % (difflib.SequenceMatcher(None, current_pr_value["file"], value["file"]).ratio() * 100)
score["result"], score["avg_score"] = final_decision(score["text_score"], score["file_score"])
all_score.append(score)
return all_score
def dup_detect(src_api, dst_api):
if src_api["issue"]["subject"] is None:
src_api["issue"]["subject"] = ""
if src_api["issue"]["description"] is None:
src_api["issue"]["description"] = ""
if dst_api["issue"]["subject"] is None:
dst_api["issue"]["subject"] = ""
if dst_api["issue"]["description"] is None:
dst_api["issue"]["description"] = ""
src_txt = src_api["issue"]["subject"] + " " + src_api["issue"]["description"]
dst_txt = dst_api["issue"]["subject"] + " " + dst_api["issue"]["description"]
src_word = jieba.lcut(src_txt)
dst_word = jieba.lcut(dst_txt)
diff_result = difflib.SequenceMatcher(None, src_word, dst_word).ratio() # difflib计算分值
return diff_result
def extract_filename(file_info):
file = [] # 解析PR改变的文件名列表
for f in file_info["files"]:
for fs in f["sections"]:
file.append(fs["fileName"])
return file
def extract_description(pr_info):
'''解析PR的description有可能为空'''
if pr_info["issue"]["description"] is not None:
description = pr_info["issue"]["description"]
else:
description = ""
return description
def main(repo, pr_num, url_head):
project_url = url_head[0] + "//" + url_head[2] + "/api/" +repo
all_pr_url = url_head[0] + "//" + url_head[2] + "/api/" + repo + "/pulls.json"
pr_num_url = url_head[0] + "//" + url_head[2] + "/api/" +repo+"/pulls/"+str(pr_num)+".json"
project_info = crawl_singel_pr(project_url)
# pr_info = crawl_singel_pr(pr_num_url)
project_key = str(project_info["project_id"]) + "_" + str(project_info['repo_id'])
project_prcnt_key = project_key + '_' + "prNum"
project_prcnt_value = project_info["pull_requests_count"]
r.set(project_prcnt_key, project_prcnt_value) # 设置当前项目PR的数量
all_pr_info = crawl_all_pr(all_pr_url) # 获取当前项目的所有PR的简要数据
for p in tqdm(all_pr_info["issues"]):
project_pr_key = project_key + "_" + str(p["pull_request_id"])
if r.get(project_pr_key) is None:
pr_url = url_head[0] + "//" + url_head[2] + "/api/" +repo+"/pulls/"+str(p["pull_request_id"])+".json"
file_url = url_head[0] + "//" + url_head[2] + "/api/" +repo+"/pulls/"+str(p["pull_request_id"])+"/files.json"
pr_info = crawl_singel_pr(pr_url)
file_info = crawl_singel_pr(file_url)
if pr_info is None or all_pr_info is None: # 接续判断该url是存在
return None, None
title = pr_info["issue"]["subject"] # 解析PR的title
description = extract_description(pr_info) # 解析description
file = extract_filename(file_info) # 解析title
value = {"title": title, "description": description, "file": file}
r.set(project_pr_key, json.dumps(value)) # 将需要缓存的数据写入redis数据库
else:
pass # 跳过,下一步直接获取缓存数据
text_score = detect_text_file_dup(all_pr_info, project_key, pr_num) # 计算分数
return text_score
def final_decision(txt_score, file_score):
score = (float(txt_score) + float(file_score)) / 2
if score >= 0 and score < 20:
result = "极不可能"
if score >= 20 and score < 40:
result = "不可能"
if score >= 40 and score < 60:
result = "有可能"
if score >= 60 and score < 80:
result = "很有可能"
if score >= 80 and score <= 100:
result = "极有可能"
return result, '%.1f'%score