Update main.py

This commit is contained in:
alantang 2025-06-04 21:24:31 +08:00 committed by GitHub
parent f9702e0560
commit e503091241
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 52 additions and 78 deletions

130
main.py
View File

@ -77,16 +77,6 @@ cache_valid_days = 7 # 缓存有效期(天)
if not os.path.exists(cache_folder):
os.makedirs(cache_folder)
# 模拟浏览器的请求头
BROWSER_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0',
}
# 加载缓存
def load_cache():
if os.path.exists(cache_file):
@ -172,8 +162,7 @@ async def fetch_channels(session, url, cache):
if not cache_hit:
try:
# 使用浏览器请求头发送请求
async with session.get(url, headers=BROWSER_HEADERS) as response:
async with session.get(url) as response:
response.raise_for_status()
content = await response.text()
response.encoding = 'utf-8'
@ -274,12 +263,11 @@ async def filter_source_urls(template_file):
source_urls = config.source_urls
cache = load_cache()
# 创建带有浏览器请求头的会话
all_channels = OrderedDict()
async with aiohttp.ClientSession() as session:
tasks = [fetch_channels(session, url, cache) for url in source_urls]
fetched_channels_list = await asyncio.gather(*tasks)
all_channels = OrderedDict()
for fetched_channels in fetched_channels_list:
merge_channels(all_channels, fetched_channels)
@ -319,15 +307,7 @@ def merge_channels(target, source):
def is_ipv6(url):
return re.match(r'^http:\/\/\[[0-9a-fA-F:]+\]', url) is not None
async def test_url(session, url):
try:
# 使用浏览器请求头测试URL
async with session.head(url, headers=BROWSER_HEADERS, timeout=5) as response:
return response.status == 200
except Exception:
return False
async def updateChannelUrlsM3U(channels, template_channels, cache):
def updateChannelUrlsM3U(channels, template_channels, cache):
written_urls_ipv4 = set()
written_urls_ipv6 = set()
url_changes = {"added": [], "removed": [], "modified": []}
@ -368,67 +348,61 @@ async def updateChannelUrlsM3U(channels, template_channels, cache):
ipv6_m3u_path = os.path.join(output_folder, "live_ipv6.m3u")
ipv6_txt_path = os.path.join(output_folder, "live_ipv6.txt")
# 创建带有浏览器请求头的会话用于测试URL
async with aiohttp.ClientSession() as session:
with open(ipv4_m3u_path, "w", encoding="utf-8") as f_m3u_ipv4, \
open(ipv4_txt_path, "w", encoding="utf-8") as f_txt_ipv4, \
open(ipv6_m3u_path, "w", encoding="utf-8") as f_m3u_ipv6, \
open(ipv6_txt_path, "w", encoding="utf-8") as f_txt_ipv6:
with open(ipv4_m3u_path, "w", encoding="utf-8") as f_m3u_ipv4, \
open(ipv4_txt_path, "w", encoding="utf-8") as f_txt_ipv4, \
open(ipv6_m3u_path, "w", encoding="utf-8") as f_m3u_ipv6, \
open(ipv6_txt_path, "w", encoding="utf-8") as f_txt_ipv6:
f_m3u_ipv4.write(f"""#EXTM3U x-tvg-url={",".join(f'"{epg_url}"' for epg_url in config.epg_urls)}\n""")
f_m3u_ipv6.write(f"""#EXTM3U x-tvg-url={",".join(f'"{epg_url}"' for epg_url in config.epg_urls)}\n""")
f_m3u_ipv4.write(f"""#EXTM3U x-tvg-url={",".join(f'"{epg_url}"' for epg_url in config.epg_urls)}\n""")
f_m3u_ipv6.write(f"""#EXTM3U x-tvg-url={",".join(f'"{epg_url}"' for epg_url in config.epg_urls)}\n""")
for group in config.announcements:
f_txt_ipv4.write(f"{group['channel']},#genre#\n")
f_txt_ipv6.write(f"{group['channel']},#genre#\n")
for announcement in group['entries']:
url = announcement['url']
url = remove_unnecessary_params(url)
if is_ipv6(url):
if url not in written_urls_ipv6 and is_valid_url(url):
written_urls_ipv6.add(url)
write_to_files(f_m3u_ipv6, f_txt_ipv6, group['channel'], announcement['name'], 1, url)
else:
if url not in written_urls_ipv4 and is_valid_url(url):
written_urls_ipv4.add(url)
write_to_files(f_m3u_ipv4, f_txt_ipv4, group['channel'], announcement['name'], 1, url)
for group in config.announcements:
f_txt_ipv4.write(f"{group['channel']},#genre#\n")
f_txt_ipv6.write(f"{group['channel']},#genre#\n")
for announcement in group['entries']:
url = announcement['url']
url = remove_unnecessary_params(url)
if is_ipv6(url):
if url not in written_urls_ipv6 and is_valid_url(url):
written_urls_ipv6.add(url)
write_to_files(f_m3u_ipv6, f_txt_ipv6, group['channel'], announcement['name'], 1, url)
else:
if url not in written_urls_ipv4 and is_valid_url(url):
written_urls_ipv4.add(url)
write_to_files(f_m3u_ipv4, f_txt_ipv4, group['channel'], announcement['name'], 1, url)
for category, channel_list in template_channels.items():
f_txt_ipv4.write(f"{category},#genre#\n")
f_txt_ipv6.write(f"{category},#genre#\n")
if category in channels:
for channel_name in channel_list:
if channel_name in channels[category]:
sorted_urls_ipv4 = []
sorted_urls_ipv6 = []
for url in channels[category][channel_name]:
url = remove_unnecessary_params(url)
if await test_url(session, url):
if is_ipv6(url):
if url not in written_urls_ipv6 and is_valid_url(url):
sorted_urls_ipv6.append(url)
written_urls_ipv6.add(url)
else:
if url not in written_urls_ipv4 and is_valid_url(url):
sorted_urls_ipv4.append(url)
written_urls_ipv4.add(url)
for category, channel_list in template_channels.items():
f_txt_ipv4.write(f"{category},#genre#\n")
f_txt_ipv6.write(f"{category},#genre#\n")
if category in channels:
for channel_name in channel_list:
if channel_name in channels[category]:
sorted_urls_ipv4 = []
sorted_urls_ipv6 = []
for url in channels[category][channel_name]:
url = remove_unnecessary_params(url)
if is_ipv6(url):
if url not in written_urls_ipv6 and is_valid_url(url):
sorted_urls_ipv6.append(url)
written_urls_ipv6.add(url)
else:
if url not in written_urls_ipv4 and is_valid_url(url):
sorted_urls_ipv4.append(url)
written_urls_ipv4.add(url)
sorted_urls_ipv4 = sorted_urls_ipv4[:20]
sorted_urls_ipv6 = sorted_urls_ipv6[:20]
total_urls_ipv4 = len(sorted_urls_ipv4)
total_urls_ipv6 = len(sorted_urls_ipv6)
total_urls_ipv4 = len(sorted_urls_ipv4)
total_urls_ipv6 = len(sorted_urls_ipv6)
for index, url in enumerate(sorted_urls_ipv4, start=1):
new_url = add_url_suffix(url, index, total_urls_ipv4, "IPV4")
write_to_files(f_m3u_ipv4, f_txt_ipv4, category, channel_name, index, new_url)
for index, url in enumerate(sorted_urls_ipv4, start=1):
new_url = add_url_suffix(url, index, total_urls_ipv4, "IPV4")
write_to_files(f_m3u_ipv4, f_txt_ipv4, category, channel_name, index, new_url)
for index, url in enumerate(sorted_urls_ipv6, start=1):
new_url = add_url_suffix(url, index, total_urls_ipv6, "IPV6")
write_to_files(f_m3u_ipv6, f_txt_ipv6, category, channel_name, index, new_url)
for index, url in enumerate(sorted_urls_ipv6, start=1):
new_url = add_url_suffix(url, index, total_urls_ipv6, "IPV6")
write_to_files(f_m3u_ipv6, f_txt_ipv6, category, channel_name, index, new_url)
f_txt_ipv4.write("\n")
f_txt_ipv6.write("\n")
f_txt_ipv4.write("\n")
f_txt_ipv6.write("\n")
# 保存URL变化日志
if url_changes["added"] or url_changes["removed"] or url_changes["modified"]:
@ -489,7 +463,7 @@ CCTV-2
loop = asyncio.get_event_loop()
channels, template_channels, cache = loop.run_until_complete(filter_source_urls(template_file))
loop.run_until_complete(updateChannelUrlsM3U(channels, template_channels, cache))
updateChannelUrlsM3U(channels, template_channels, cache)
loop.close()
print("操作完成结果已保存到live文件夹。")
except Exception as e: