-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathiptv_validator.py
More file actions
302 lines (258 loc) · 11.3 KB
/
iptv_validator.py
File metadata and controls
302 lines (258 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
IPTV直播地址验证工具
自动检查IPTV.txt文件中的直播链接是否可用
"""
import requests
import time
import re
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from datetime import datetime
class IPTVValidator:
def __init__(self, timeout=10, max_workers=20):
self.timeout = timeout
self.max_workers = max_workers
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
def parse_iptv_file(self, filename):
"""解析IPTV文件,提取频道名称和URL"""
channels = []
current_genre = ""
try:
with open(filename, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
# 检查是否是分类行
if line.endswith(',#genre#'):
current_genre = line.replace(',#genre#', '')
continue
# 检查是否是频道行
if ',' in line and ('http://' in line or 'https://' in line):
parts = line.split(',', 1)
if len(parts) == 2:
name = parts[0].strip()
url = parts[1].strip()
channels.append({
'line_num': line_num,
'name': name,
'url': url,
'genre': current_genre
})
except Exception as e:
print(f"解析文件时出错: {e}")
return []
return channels
def validate_url(self, channel):
"""验证单个URL是否可用"""
url = channel['url']
name = channel['name']
try:
# 对于m3u8文件,使用HEAD请求检查
response = self.session.head(url, timeout=self.timeout, allow_redirects=True)
# 检查状态码
if response.status_code == 200:
# 检查Content-Type
content_type = response.headers.get('Content-Type', '').lower()
if 'video' in content_type or 'application' in content_type or 'text' in content_type:
return {
'status': 'success',
'channel': channel,
'response_time': response.elapsed.total_seconds(),
'status_code': response.status_code,
'content_type': content_type
}
else:
return {
'status': 'warning',
'channel': channel,
'response_time': response.elapsed.total_seconds(),
'status_code': response.status_code,
'content_type': content_type,
'message': 'Content-Type可能不正确'
}
else:
return {
'status': 'error',
'channel': channel,
'status_code': response.status_code,
'message': f'HTTP {response.status_code}'
}
except requests.exceptions.Timeout:
return {
'status': 'error',
'channel': channel,
'message': '连接超时'
}
except requests.exceptions.ConnectionError:
return {
'status': 'error',
'channel': channel,
'message': '连接错误'
}
except requests.exceptions.RequestException as e:
return {
'status': 'error',
'channel': channel,
'message': f'请求错误: {str(e)}'
}
except Exception as e:
return {
'status': 'error',
'channel': channel,
'message': f'未知错误: {str(e)}'
}
def validate_all(self, channels):
"""并发验证所有频道"""
results = []
print(f"开始验证 {len(channels)} 个频道...")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_channel = {
executor.submit(self.validate_url, channel): channel
for channel in channels
}
# 收集结果
for i, future in enumerate(as_completed(future_to_channel), 1):
try:
result = future.result()
results.append(result)
# 显示进度
if result['status'] == 'success':
print(f"[{i}/{len(channels)}] ✅ {result['channel']['name']}")
elif result['status'] == 'warning':
print(f"[{i}/{len(channels)}] ⚠️ {result['channel']['name']} - {result.get('message', '')}")
else:
print(f"[{i}/{len(channels)}] ❌ {result['channel']['name']} - {result.get('message', '')}")
except Exception as e:
channel = future_to_channel[future]
results.append({
'status': 'error',
'channel': channel,
'message': f'验证异常: {str(e)}'
})
print(f"[{i}/{len(channels)}] ❌ {channel['name']} - 验证异常")
return results
def generate_report(self, results, output_file=None):
"""生成验证报告"""
# 统计结果
total = len(results)
success = len([r for r in results if r['status'] == 'success'])
warning = len([r for r in results if r['status'] == 'warning'])
error = len([r for r in results if r['status'] == 'error'])
# 按状态分组
success_channels = [r for r in results if r['status'] == 'success']
warning_channels = [r for r in results if r['status'] == 'warning']
error_channels = [r for r in results if r['status'] == 'error']
# 生成报告
report = []
report.append("=" * 60)
report.append("IPTV直播地址验证报告")
report.append("=" * 60)
report.append(f"验证时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"总频道数: {total}")
report.append(f"可用频道: {success} ({success/total*100:.1f}%)")
report.append(f"警告频道: {warning} ({warning/total*100:.1f}%)")
report.append(f"无效频道: {error} ({error/total*100:.1f}%)")
report.append("")
# 详细结果
if error_channels:
report.append("❌ 无效频道列表:")
report.append("-" * 40)
for result in error_channels:
channel = result['channel']
report.append(f"• {channel['name']} ({channel['genre']})")
report.append(f" URL: {channel['url']}")
report.append(f" 错误: {result.get('message', '未知错误')}")
report.append(f" 行号: {channel['line_num']}")
report.append("")
if warning_channels:
report.append("⚠️ 警告频道列表:")
report.append("-" * 40)
for result in warning_channels:
channel = result['channel']
report.append(f"• {channel['name']} ({channel['genre']})")
report.append(f" URL: {channel['url']}")
report.append(f" 警告: {result.get('message', '')}")
report.append(f" 行号: {channel['line_num']}")
report.append("")
if success_channels:
report.append("✅ 可用频道列表:")
report.append("-" * 40)
for result in success_channels:
channel = result['channel']
response_time = result.get('response_time', 0)
report.append(f"• {channel['name']} ({channel['genre']}) - {response_time:.2f}s")
report_text = "\n".join(report)
# 输出到控制台
print("\n" + report_text)
# 保存到文件
if output_file:
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report_text)
print(f"\n报告已保存到: {output_file}")
except Exception as e:
print(f"保存报告时出错: {e}")
return report_text
def generate_clean_iptv(self, results, output_file=None):
"""生成清理后的IPTV文件(只包含可用频道)"""
success_results = [r for r in results if r['status'] == 'success']
if not success_results:
print("没有可用的频道,无法生成清理后的文件")
return
# 按分类组织频道
genres = {}
for result in success_results:
channel = result['channel']
genre = channel['genre']
if genre not in genres:
genres[genre] = []
genres[genre].append(channel)
# 生成新文件内容
lines = []
for genre, channels in genres.items():
lines.append(f"{genre},#genre#")
for channel in channels:
lines.append(f"{channel['name']},{channel['url']}")
lines.append("") # 空行分隔
content = "\n".join(lines)
if output_file:
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
print(f"清理后的IPTV文件已保存到: {output_file}")
except Exception as e:
print(f"保存清理文件时出错: {e}")
return content
def main():
"""主函数"""
print("IPTV直播地址验证工具")
print("=" * 40)
# 创建验证器
validator = IPTVValidator(timeout=10, max_workers=20)
# 解析IPTV文件
iptv_file = "IPTV.txt"
print(f"正在解析文件: {iptv_file}")
channels = validator.parse_iptv_file(iptv_file)
if not channels:
print("没有找到有效的频道信息")
return
print(f"找到 {len(channels)} 个频道")
# 验证所有频道
results = validator.validate_all(channels)
# 生成报告
report_file = f"iptv_validation_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
validator.generate_report(results, report_file)
# 生成清理后的IPTV文件
clean_file = f"IPTV_clean_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
validator.generate_clean_iptv(results, clean_file)
print("\n验证完成!")
if __name__ == "__main__":
main()