X/main.py

472 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import asyncio
import logging
import json
import os
from collections import OrderedDict
from datetime import datetime, timedelta
import difflib
import hashlib
from urllib.parse import urlparse, urlunparse, parse_qs
# 检查 aiohttp 是否安装
try:
import aiohttp
except ImportError:
print("错误: 缺少必要的依赖库 'aiohttp'")
print("请使用以下命令安装:")
print("pip install aiohttp")
import sys
sys.exit(1)
# 检查 config 是否存在
try:
import config
# 验证配置文件的基本结构
required_attrs = ['source_urls', 'epg_urls', 'announcements', 'url_blacklist', 'ip_version_priority']
for attr in required_attrs:
if not hasattr(config, attr):
raise AttributeError(f"配置文件缺少必要的属性: {attr}")
except ImportError:
print("错误: 找不到配置模块 'config.py'")
print("请确保项目目录下有 config.py 文件,内容示例如下:")
print("""
# config.py 示例内容
source_urls = [
"https://example.com/source1.m3u",
"https://example.com/source2.m3u"
]
epg_urls = ["https://example.com/epg.xml"]
announcements = [
{
"channel": "公告",
"entries": [
{
"name": None,
"url": "https://example.com/notice",
"logo": "https://picsum.photos/100/100?random=1"
}
]
}
]
url_blacklist = []
ip_version_priority = "ipv4"
""")
import sys
sys.exit(1)
except AttributeError as e:
print(f"配置文件错误: {e}")
import sys
sys.exit(1)
# 日志记录,只记录错误信息
logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler("./live/function.log", "w", encoding="utf-8"), logging.StreamHandler()])
# 确保 live 文件夹存在
output_folder = "live"
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 缓存文件夹和文件
cache_folder = "./live/cache"
cache_file = os.path.join(cache_folder, "url_cache.json")
cache_valid_days = 7 # 缓存有效期(天)
# 确保缓存文件夹存在
if not os.path.exists(cache_folder):
os.makedirs(cache_folder)
# 加载缓存
def load_cache():
if os.path.exists(cache_file):
try:
with open(cache_file, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logging.error(f"加载缓存失败: {e}")
return {"urls": {}, "timestamp": datetime.now().isoformat()}
# 保存缓存
def save_cache(cache):
cache["timestamp"] = datetime.now().isoformat()
try:
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(cache, f, ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"保存缓存失败: {e}")
# 检查缓存是否有效
def is_cache_valid(cache):
if not cache:
return False
timestamp = datetime.fromisoformat(cache.get("timestamp", datetime.now().isoformat()))
return (datetime.now() - timestamp).days < cache_valid_days
# 计算URL内容的哈希值
def calculate_hash(content):
return hashlib.md5(content.encode('utf-8')).hexdigest()
def parse_template(template_file):
template_channels = OrderedDict()
current_category = None
with open(template_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
if "#genre#" in line:
current_category = line.split(",")[0].strip()
template_channels[current_category] = []
elif current_category:
channel_name = line.split(",")[0].strip()
template_channels[current_category].append(channel_name)
return template_channels
def clean_channel_name(channel_name):
cleaned_name = re.sub(r'[$「」-]', '', channel_name)
cleaned_name = re.sub(r'\s+', '', cleaned_name)
cleaned_name = re.sub(r'(\D*)(\d+)', lambda m: m.group(1) + str(int(m.group(2))), cleaned_name)
return cleaned_name.upper()
def is_valid_url(url):
return bool(re.match(r'^https?://', url))
def remove_unnecessary_params(url):
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
# 假设只保留必要的参数,这里可以根据实际情况修改
necessary_params = {}
for param, values in query_params.items():
if param in ['必要参数1', '必要参数2']: # 替换为实际必要的参数名
necessary_params[param] = values
new_query = '&'.join([f'{param}={value[0]}' for param, value in necessary_params.items()])
new_url = urlunparse((parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, new_query, parsed_url.fragment))
return new_url
async def fetch_channels(session, url, cache):
channels = OrderedDict()
unique_urls = set()
cache_hit = False
# 检查URL是否在缓存中且有效
url_hash = calculate_hash(url)
if url_hash in cache["urls"]:
cached_entry = cache["urls"][url_hash]
if datetime.now() - datetime.fromisoformat(cached_entry["timestamp"]) <= timedelta(days=cache_valid_days):
logging.info(f"从缓存加载: {url}")
channels = OrderedDict(cached_entry["channels"])
unique_urls = set(cached_entry["unique_urls"])
cache_hit = True
if not cache_hit:
try:
async with session.get(url) as response:
response.raise_for_status()
content = await response.text()
response.encoding = 'utf-8'
lines = content.split("\n")
current_category = None
is_m3u = any(line.startswith("#EXTINF") for line in lines[:15])
source_type = "m3u" if is_m3u else "txt"
if is_m3u:
channels.update(parse_m3u_lines(lines, unique_urls))
else:
channels.update(parse_txt_lines(lines, unique_urls))
if channels:
# 更新缓存
cache["urls"][url_hash] = {
"url": url,
"channels": dict(channels),
"unique_urls": list(unique_urls),
"timestamp": datetime.now().isoformat(),
"content_hash": calculate_hash(content)
}
save_cache(cache)
except Exception as e:
logging.error(f"url: {url} 失败❌, Error: {e}")
# 再次检查并去除重复的频道
for category, channel_list in channels.items():
new_channel_list = []
seen_channels = set()
for channel_name, url in channel_list:
if (channel_name, url) not in seen_channels:
new_channel_list.append((channel_name, url))
seen_channels.add((channel_name, url))
channels[category] = new_channel_list
return channels
def parse_m3u_lines(lines, unique_urls):
channels = OrderedDict()
current_category = None
for line in lines:
line = line.strip()
if line.startswith("#EXTINF"):
match = re.search(r'group-title="(.*?)",(.*)', line)
if match:
current_category = match.group(1).strip()
channel_name = match.group(2).strip()
if channel_name and channel_name.startswith("CCTV"):
channel_name = clean_channel_name(channel_name)
if current_category not in channels:
channels[current_category] = []
elif line and not line.startswith("#"):
channel_url = line.strip()
if is_valid_url(channel_url) and channel_url not in unique_urls:
unique_urls.add(channel_url)
if current_category and channel_name:
channels[current_category].append((channel_name, channel_url))
return channels
def parse_txt_lines(lines, unique_urls):
channels = OrderedDict()
current_category = None
for line in lines:
line = line.strip()
if "#genre#" in line:
current_category = line.split(",")[0].strip()
channels[current_category] = []
elif current_category:
match = re.match(r"^(.*?),(.*?)$", line)
if match:
channel_name = match.group(1).strip()
if channel_name and channel_name.startswith("CCTV"):
channel_name = clean_channel_name(channel_name)
channel_urls = match.group(2).strip().split('#')
for channel_url in channel_urls:
channel_url = channel_url.strip()
if is_valid_url(channel_url) and channel_url not in unique_urls:
unique_urls.add(channel_url)
channels[current_category].append((channel_name, channel_url))
elif line:
channels[current_category].append((line, ''))
return channels
def find_similar_name(target_name, name_list):
matches = difflib.get_close_matches(target_name, name_list, n=1, cutoff=0.6)
return matches[0] if matches else None
async def filter_source_urls(template_file):
template_channels = parse_template(template_file)
source_urls = config.source_urls
cache = load_cache()
all_channels = OrderedDict()
async with aiohttp.ClientSession() as session:
tasks = [fetch_channels(session, url, cache) for url in source_urls]
fetched_channels_list = await asyncio.gather(*tasks)
for fetched_channels in fetched_channels_list:
merge_channels(all_channels, fetched_channels)
matched_channels = match_channels(template_channels, all_channels)
return matched_channels, template_channels, cache
def match_channels(template_channels, all_channels):
matched_channels = OrderedDict()
all_online_channel_names = []
for online_category, online_channel_list in all_channels.items():
for online_channel_name, _ in online_channel_list:
all_online_channel_names.append(online_channel_name)
for category, channel_list in template_channels.items():
matched_channels[category] = OrderedDict()
for channel_name in channel_list:
similar_name = find_similar_name(clean_channel_name(channel_name), [clean_channel_name(name) for name in all_online_channel_names])
if similar_name:
original_name = next((name for name in all_online_channel_names if clean_channel_name(name) == similar_name), None)
if original_name:
for online_category, online_channel_list in all_channels.items():
for online_channel_name, online_channel_url in online_channel_list:
if online_channel_name == original_name:
matched_channels[category].setdefault(channel_name, []).append(online_channel_url)
return matched_channels
def merge_channels(target, source):
for category, channel_list in source.items():
if category in target:
target[category].extend(channel_list)
else:
target[category] = channel_list
def is_ipv6(url):
return re.match(r'^http:\/\/\[[0-9a-fA-F:]+\]', url) is not None
def updateChannelUrlsM3U(channels, template_channels, cache):
written_urls_ipv4 = set()
written_urls_ipv6 = set()
url_changes = {"added": [], "removed": [], "modified": []}
# 检查缓存中的URL状态
if is_cache_valid(cache):
previous_urls = {}
for url_hash, entry in cache["urls"].items():
for category, channel_list in entry["channels"].items():
for channel_name, url in channel_list:
previous_urls[url] = (category, channel_name)
# 检测URL变化
current_urls = {}
for category, channel_dict in channels.items():
for channel_name, urls in channel_dict.items():
for url in urls:
current_urls[url] = (category, channel_name)
# 新增的URL
for url, (category, channel_name) in current_urls.items():
if url not in previous_urls:
url_changes["added"].append((category, channel_name, url))
# 移除的URL
for url, (category, channel_name) in previous_urls.items():
if url not in current_urls:
url_changes["removed"].append((category, channel_name, url))
current_date = datetime.now().strftime("%Y-%m-%d")
for group in config.announcements:
for announcement in group['entries']:
if announcement['name'] is None:
announcement['name'] = current_date
ipv4_m3u_path = os.path.join(output_folder, "live_ipv4.m3u")
ipv4_txt_path = os.path.join(output_folder, "live_ipv4.txt")
ipv6_m3u_path = os.path.join(output_folder, "live_ipv6.m3u")
ipv6_txt_path = os.path.join(output_folder, "live_ipv6.txt")
with open(ipv4_m3u_path, "w", encoding="utf-8") as f_m3u_ipv4, \
open(ipv4_txt_path, "w", encoding="utf-8") as f_txt_ipv4, \
open(ipv6_m3u_path, "w", encoding="utf-8") as f_m3u_ipv6, \
open(ipv6_txt_path, "w", encoding="utf-8") as f_txt_ipv6:
f_m3u_ipv4.write(f"""#EXTM3U x-tvg-url={",".join(f'"{epg_url}"' for epg_url in config.epg_urls)}\n""")
f_m3u_ipv6.write(f"""#EXTM3U x-tvg-url={",".join(f'"{epg_url}"' for epg_url in config.epg_urls)}\n""")
for group in config.announcements:
f_txt_ipv4.write(f"{group['channel']},#genre#\n")
f_txt_ipv6.write(f"{group['channel']},#genre#\n")
for announcement in group['entries']:
url = announcement['url']
url = remove_unnecessary_params(url)
if is_ipv6(url):
if url not in written_urls_ipv6 and is_valid_url(url):
written_urls_ipv6.add(url)
write_to_files(f_m3u_ipv6, f_txt_ipv6, group['channel'], announcement['name'], 1, url)
else:
if url not in written_urls_ipv4 and is_valid_url(url):
written_urls_ipv4.add(url)
write_to_files(f_m3u_ipv4, f_txt_ipv4, group['channel'], announcement['name'], 1, url)
for category, channel_list in template_channels.items():
f_txt_ipv4.write(f"{category},#genre#\n")
f_txt_ipv6.write(f"{category},#genre#\n")
if category in channels:
for channel_name in channel_list:
if channel_name in channels[category]:
sorted_urls_ipv4 = []
sorted_urls_ipv6 = []
for url in channels[category][channel_name]:
url = remove_unnecessary_params(url)
if is_ipv6(url):
if url not in written_urls_ipv6 and is_valid_url(url):
sorted_urls_ipv6.append(url)
written_urls_ipv6.add(url)
else:
if url not in written_urls_ipv4 and is_valid_url(url):
sorted_urls_ipv4.append(url)
written_urls_ipv4.add(url)
total_urls_ipv4 = len(sorted_urls_ipv4)
total_urls_ipv6 = len(sorted_urls_ipv6)
for index, url in enumerate(sorted_urls_ipv4, start=1):
new_url = add_url_suffix(url, index, total_urls_ipv4, "IPV4")
write_to_files(f_m3u_ipv4, f_txt_ipv4, category, channel_name, index, new_url)
for index, url in enumerate(sorted_urls_ipv6, start=1):
new_url = add_url_suffix(url, index, total_urls_ipv6, "IPV6")
write_to_files(f_m3u_ipv6, f_txt_ipv6, category, channel_name, index, new_url)
f_txt_ipv4.write("\n")
f_txt_ipv6.write("\n")
# 保存URL变化日志
if url_changes["added"] or url_changes["removed"] or url_changes["modified"]:
with open(os.path.join(output_folder, "url_changes.log"), "a", encoding="utf-8") as f:
f.write(f"\n=== 更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===\n")
if url_changes["added"]:
f.write("\n新增URL:\n")
for category, channel_name, url in url_changes["added"]:
f.write(f"- {category} - {channel_name}: {url}\n")
if url_changes["removed"]:
f.write("\n移除URL:\n")
for category, channel_name, url in url_changes["removed"]:
f.write(f"- {category} - {channel_name}: {url}\n")
if url_changes["modified"]:
f.write("\n修改URL:\n")
for category, channel_name, old_url, new_url in url_changes["modified"]:
f.write(f"- {category} - {channel_name}: {old_url}{new_url}\n")
def sort_and_filter_urls(urls, written_urls):
filtered_urls = [
url for url in sorted(urls, key=lambda u: not is_ipv6(u) if config.ip_version_priority == "ipv6" else is_ipv6(u))
if url and url not in written_urls and not any(blacklist in url for blacklist in config.url_blacklist) and is_valid_url(url)
]
written_urls.update(filtered_urls)
return filtered_urls
def add_url_suffix(url, index, total_urls, ip_version):
suffix = f"${ip_version}" if total_urls == 1 else f"${ip_version}•线路{index}"
base_url = url.split('$', 1)[0] if '$' in url else url
return f"{base_url}{suffix}"
def write_to_files(f_m3u, f_txt, category, channel_name, index, new_url):
# 减少不必要的元数据
f_m3u.write(f"#EXTINF:-1 group-title=\"{category}\",{channel_name}\n")
f_m3u.write(new_url + "\n")
f_txt.write(f"{channel_name},{new_url}\n")
if __name__ == "__main__":
template_file = "demo.txt"
try:
# 检查模板文件是否存在
if not os.path.exists(template_file):
print(f"错误: 找不到模板文件 '{template_file}'")
print("请确保项目目录下有 demo.txt 文件。")
print("示例内容如下:")
print("""
# demo.txt 示例内容
央视,#genre#
CCTV-1
CCTV-2
卫视,#genre#
北京卫视
上海卫视
广东卫视
""")
import sys
sys.exit(1)
loop = asyncio.get_event_loop()
channels, template_channels, cache = loop.run_until_complete(filter_source_urls(template_file))
updateChannelUrlsM3U(channels, template_channels, cache)
loop.close()
print("操作完成结果已保存到live文件夹。")
except Exception as e:
print(f"执行过程中发生错误: {e}")
logging.error(f"程序运行失败: {e}")