第一个源数据采样完成

This commit is contained in:
EvilCult 2019-05-22 18:41:21 +08:00 committed by GitHub
commit 3ebaa37cd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 274 additions and 1 deletions

5
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,5 @@
{
"python.pythonPath": "/usr/local/bin/python",
"python.linting.pylintEnabled": true,
"python.linting.enabled": true
}

View File

@ -1 +1,2 @@
# iptv-m3u-maker
# 测试中...
到底是个什么东西,我自己也不太清楚....

89
db.py Executable file
View File

@ -0,0 +1,89 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sqlite3
import getpass
import os
class DataBase :
def __init__ (self) :
user = getpass.getuser()
self.dbAddress = os.getcwd() + '/'
self.table = 'lists'
if self.connect() == False:
self.connStat = False
else :
self.connStat = True
self.chkTable()
def __del__ (self) :
if self.connStat == True :
self.disConn()
def connect (self) :
try:
if not os.path.exists(self.dbAddress) :
os.makedirs(self.dbAddress)
self.dbAddress += 'db.sqlite3'
self.conn = sqlite3.connect(self.dbAddress)
self.cur = self.conn.cursor()
return True
except Exception, e:
return False
def create (self) :
if self.connStat == False : return False
sql = 'create table ' + self.table + ' (id integer PRIMARY KEY autoincrement, title text, quality text, url text, enable integer, online integer, update text)'
self.cur.execute(sql)
def query (self, sql) :
if self.connStat == False : return False
self.cur.execute(sql)
values = self.cur.fetchall()
return values
def insert (self, data):
if self.connStat == False : return False
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
keyList = []
valList = []
for k, v in data.iteritems():
keyList.append(k)
valList.append(str(v).replace('"','\"').replace("'","''"))
sql = "insert into " + self.table + " (`" + '`, `'.join(keyList) + "`) values ('" + "', '".join(valList) + "')"
self.cur.execute(sql)
self.conn.commit()
def disConn (self) :
if self.connStat == False : return False
self.cur.close()
self.conn.close()
def chkTable (self) :
if self.connStat == False : return False
sql = "SELECT tbl_name FROM sqlite_master WHERE type='table'"
tableStat = False
self.cur.execute(sql)
values = self.cur.fetchall()
for x in values:
if self.table in x :
tableStat = True
if tableStat == False :
self.create()

69
main.py Normal file
View File

@ -0,0 +1,69 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import tools
import re
class Iptv :
def __init__ (self) :
self.T = tools.Tools()
def getSourceA (self) :
url = 'https://www.jianshu.com/p/2499255c7e79'
req = [
'user-agent: Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Mobile Safari/537.36',
]
res = self.T.getPage(url, req)
if res['code'] == 200 :
pattern = re.compile(r"<code(.*?)</code>", re.I|re.S)
tmp = pattern.findall(res['body'])
pattern = re.compile(r"#EXTINF:0,(.*?)\n#EXTVLCOPT:network-caching=1000\n(.*?)\n", re.I|re.S)
sourceList = pattern.findall(tmp[0])
sourceList = sourceList + pattern.findall(tmp[1])
for item in sourceList :
playable = self.chkPlayable(item[1])
if playable == True :
info = self.fmtTitle(item[0])
print('title: ' + str(info['id']) + ' ' + str(info['title']))
print('url: ' + str(item[1]))
else :
pass # MAYBE later :P
else :
pass # MAYBE later :P
def chkPlayable (self, url) :
try:
res = self.T.getPage(url)
if res['code'] == 200 :
return True
else:
return False
except:
return False
def baseFilter (self) :
pass
def fmtTitle (self, string) :
pattern = re.compile(r"(cctv[-|\s]*\d*)*\s*?([^fhd|^hd|^sd|^\.m3u8]*)\s*?(fhd|hd|sd)*", re.I)
tmp = pattern.findall(string)[0]
result = {
'id' : tmp[0].strip('-').strip(),
'title' : tmp[1].strip('-').strip(),
'quality': tmp[2].strip('-').strip(),
}
return result
obj = Iptv()
obj.getSourceA()

109
tools.py Normal file
View File

@ -0,0 +1,109 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import urllib
import urllib2
import re
import ssl
import os
import platform
import sys
import StringIO
import gzip
import random
import socket
socket.setdefaulttimeout(5.0)
class Tools :
def __init__ (self) :
pass
def getPage (self, url, requestHeader = [], postData = {}) :
fakeIp = self.fakeIp()
requestHeader.append('CLIENT-IP:' + fakeIp)
requestHeader.append('X-FORWARDED-FOR:' + fakeIp)
if postData == {} :
request = urllib2.Request(url)
elif isinstance(postData, basestring) :
request = urllib2.Request(url, postData)
else :
request = urllib2.Request(url, urllib.urlencode(postData))
for x in requestHeader :
headerType = x.split(':')[0]
headerCon = x.replace(headerType + ':', '')
request.add_header(headerType, headerCon)
try :
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
response = urllib2.urlopen(request, context = ctx)
header = response.headers
body = response.read()
code = response.code
except urllib2.HTTPError as e:
header = e.headers
body = e.read()
code = e.code
result = {
'code': code,
'header': header,
'body': body
}
return result
def fakeIp (self) :
fakeIpList = []
for x in xrange(0, 4):
fakeIpList.append(str(int(random.uniform(0, 255))))
fakeIp = '.'.join(fakeIpList)
return fakeIp
def fmtCookie (self, string) :
result = re.sub(r"path\=\/.", "", string)
result = re.sub(r"(\S*?)\=deleted.", "", result)
result = re.sub(r"expires\=(.*?)GMT;", "", result)
result = re.sub(r"domain\=(.*?)tv.", "", result)
result = re.sub(r"httponly", "", result)
result = re.sub(r"\s", "", result)
return result
def urlencode(self, str) :
reprStr = repr(str).replace(r'\x', '%')
return reprStr[1:-1]
def gzdecode(self, data) :
try:
compressedstream = StringIO.StringIO(data)
gziper = gzip.GzipFile(fileobj = compressedstream)
html = gziper.read()
return html
except Exception as e:
return data
def getRes (self, fileName) :
if getattr(sys, 'frozen', False):
base_path = os.path.join(sys._MEIPASS, 'RES')
else:
base_path = os.path.join(os.path.abspath("../"), 'Resources')
filePath = os.path.join(base_path, fileName)
return filePath
def isWin (self) :
osType = platform.system()
if osType == 'Windows' :
return True
else :
return False