diff --git a/PR_DESCRIPTION.md b/PR_DESCRIPTION.md new file mode 100644 index 0000000..0d9f469 --- /dev/null +++ b/PR_DESCRIPTION.md @@ -0,0 +1,434 @@ +# feat: 域名级QPS限流功能 + +## 概述 + +本PR为feapder框架新增**域名级QPS限流**功能,支持对不同域名配置独立的请求频率限制,同时支持单机(AirSpider)和分布式(Spider/BatchSpider/TaskSpider)两种模式。 + +## 原始架构分析 + +### AirSpider 原始架构 + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ AirSpider │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ start_requests() │ +│ │ │ +│ ▼ │ +│ ┌─────────┐ put() ┌──────────┐ get() ┌────────┐ │ +│ │ Request │ ──────────▶ │ MemoryDB │ ──────────▶ │ Parser │ │ +│ │ 生成 │ │ (队列) │ │ Control│ │ +│ └─────────┘ └──────────┘ └────────┘ │ +│ │ │ +│ ▼ │ +│ download │ +│ & parse │ +└─────────────────────────────────────────────────────────────────┘ + +特点: +- 单进程内存队列 +- 无请求频率控制 +- 多线程并发消费 +``` + +### 分布式爬虫(Spider/BatchSpider/TaskSpider)原始架构 + +``` +┌──────────────────────────────────────────────────────────────────────┐ +│ Scheduler (分布式调度器) │ +├──────────────────────────────────────────────────────────────────────┤ +│ │ +│ start_requests() │ +│ │ │ +│ ▼ │ +│ ┌─────────┐ put() ┌───────────┐ get() ┌───────────────────┐ │ +│ │ Request │ ────────▶ │ Redis │ ──────▶ │ Collector │ │ +│ │ 生成 │ │ (队列) │ │ (批量获取+去重) │ │ +│ └─────────┘ └───────────┘ └───────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────┐ │ +│ │ ParserControl │ │ +│ │ (多线程消费) │ │ +│ └───────────────────┘ │ +│ │ │ +│ ▼ │ +│ download & parse │ +└──────────────────────────────────────────────────────────────────────┘ + +特点: +- Redis分布式队列 +- 支持多进程/多机器部署 +- Collector负责批量获取和去重 +- 无请求频率控制 +``` + +## 新增QPS限流后的架构 + +### AirSpider QPS架构 + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ AirSpider (QPS模式) │ +├─────────────────────────────────────────────────────────────────────────┤ +│ │ +│ start_requests() │ +│ │ │ +│ ▼ │ +│ ┌─────────┐ put() ┌──────────┐ get_nowait() ┌───────────────┐ │ +│ │ Request │ ────────▶ │ MemoryDB │ ─────────────▶ │ QPSScheduler │ │ +│ │ 生成 │ │ (队列) │ (批量) │ │ │ +│ └─────────┘ └──────────┘ │ ┌───────────┐ │ │ +│ │ │DomainRate │ │ │ +│ │ │ Limiter │ │ │ +│ ┌──────────────┐ get_ready_request() │ ├───────────┤ │ │ +│ │ParserControl │ ◀────────────────────────────── │ │DelayHeap │ │ │ +│ │ (多线程) │ │ ├───────────┤ │ │ +│ └──────────────┘ │ │ReadyQueue │ │ │ +│ │ │ └───────────┘ │ │ +│ ▼ └───────────────┘ │ +│ download & parse │ +└─────────────────────────────────────────────────────────────────────────┘ + +QPS控制流程: +1. Request 进入 MemoryDB +2. QPSScheduler 批量获取请求 +3. DomainRateLimiter 根据域名获取令牌 + - 有令牌:直接进入 ReadyQueue + - 无令牌:计算等待时间,进入 DelayHeap +4. 调度线程定时检查 DelayHeap,到期请求移入 ReadyQueue +5. ParserControl 从 ReadyQueue 获取请求处理 +``` + +### 分布式爬虫 QPS架构 + +``` +┌──────────────────────────────────────────────────────────────────────────┐ +│ Scheduler (分布式QPS模式) │ +├──────────────────────────────────────────────────────────────────────────┤ +│ │ +│ start_requests() │ +│ │ │ +│ ▼ │ +│ ┌─────────┐ put() ┌───────────┐ get() ┌───────────┐ │ +│ │ Request │ ──────▶ │ Redis │ ────▶ │ Collector │ │ +│ │ 生成 │ │ (队列) │ └─────┬─────┘ │ +│ └─────────┘ └───────────┘ │ │ +│ ▼ │ +│ ┌───────────────┐ │ +│ │ QPSScheduler │ │ +│ │ │ │ +│ ┌──────────────┐ │ ┌───────────┐ │ │ +│ │ParserControl │ ◀─────────────────│ │DomainRate │ │ │ +│ │ (多线程) │ get_ready_request │ │ Limiter │ │ │ +│ └──────────────┘ │ │ (Redis) │ │ ◀── 多进程共享 │ +│ │ │ ├───────────┤ │ │ +│ ▼ │ │DelayHeap │ │ │ +│ download & parse │ ├───────────┤ │ │ +│ │ │ReadyQueue │ │ │ +│ │ └───────────┘ │ │ +│ └───────────────┘ │ +└──────────────────────────────────────────────────────────────────────────┘ + +进程A ─┐ + ├──▶ RedisTokenBucket ──▶ 统一QPS配额 +进程B ─┘ + +分布式特性: +- 令牌桶存储在Redis中(Lua脚本保证原子性) +- 多进程/多机器共享同一QPS配额 +- 支持跨进程的精确QPS控制 +``` + +## 核心组件 + +### 1. 令牌桶算法 (`feapder/utils/rate_limiter.py`) + +| 类名 | 说明 | 适用场景 | +|-----|------|---------| +| `LocalTokenBucket` | 本地内存令牌桶,线程安全 | AirSpider / 单进程 | +| `RedisTokenBucket` | Redis分布式令牌桶,Lua脚本保证原子性 | Spider / 多进程分布式 | +| `DomainRateLimiter` | 域名级限流管理器,自动路由到对应令牌桶 | 统一接口 | + +**令牌桶特性:** +- 预扣模式:请求到达时立即预扣令牌,返回等待时间 +- 支持令牌为负数,确保后续请求正确排队 +- 初始令牌数为1(而非桶容量),避免启动时突发 + +### 2. QPS调度器 (`feapder/core/schedulers/qps_scheduler.py`) + +```python +class QPSScheduler: + """ + 单线程调度器,负责: + 1. 接收请求,通过DomainRateLimiter获取令牌 + 2. 立即可执行的请求进入ReadyQueue + 3. 需等待的请求进入DelayHeap(按到期时间排序) + 4. 调度线程定时将到期请求从DelayHeap移入ReadyQueue + """ +``` + +### 3. 配置项 (`feapder/setting.py`) + +```python +# 域名级QPS限流配置 +DOMAIN_RATE_LIMIT_ENABLE = False # 是否启用 +DOMAIN_RATE_LIMIT_DEFAULT = 0 # 默认QPS,0表示不限制 +DOMAIN_RATE_LIMIT_RULES = {} # 域名规则 {"www.baidu.com": 2, "*.taobao.com": 5} +DOMAIN_RATE_LIMIT_MAX_PREFETCH = 100 # 最大预取数 +DOMAIN_RATE_LIMIT_STORAGE = "local" # 存储模式:local/redis +``` + +## 文件变更清单 + +### 新增文件 + +| 文件 | 行数 | 说明 | +|-----|-----|------| +| `feapder/utils/rate_limiter.py` | 282 | 令牌桶算法实现 | +| `feapder/core/schedulers/__init__.py` | 12 | 调度模块入口 | +| `feapder/core/schedulers/qps_scheduler.py` | 339 | QPS调度器 | +| `tests/qps-scheduler/*.py` | ~1000 | 单元测试和集成测试 | + +### 修改文件 + +| 文件 | 改动 | 说明 | +|-----|-----|------| +| `feapder/setting.py` | +15 | 新增QPS配置项 | +| `feapder/db/memorydb.py` | +24 | 新增 `get_nowait()` 非阻塞方法 | +| `feapder/core/spiders/air_spider.py` | +29 | AirSpider QPS集成 | +| `feapder/core/parser_control.py` | +68 | ParserControl QPS集成 | +| `feapder/core/scheduler.py` | +30 | 分布式爬虫 QPS集成 | +| `feapder/templates/project_template/setting.py` | +15 | 模板配置更新 | + +## 设计原则 + +### 1. 零侵入性 +- **QPS关闭时**:代码流程与原始完全一致,无任何性能损耗 +- **QPS开启时**:仅在获取请求环节增加调度逻辑 + +### 2. 向后兼容 +- 所有配置项默认关闭 +- 不影响现有爬虫的任何行为 + +### 3. 精确控制 +- 令牌桶预扣机制确保QPS精度 +- 测试验证:配置2 QPS,实际误差 < 2% + +### 4. 分布式支持 +- Redis令牌桶 + Lua脚本保证原子性 +- 多进程共享QPS配额 + +## 测试验证 + +### 测试1:单机QPS精度测试 + +**测试场景**:32线程,baidu.com 配置 2 QPS,每域名20个请求 + +**测试命令**: +```bash +PYTHONPATH=. python tests/qps-scheduler/test_mixed_qps_comparison.py +``` + +**测试结果**: +``` +====================================================================== +测试①:QPS开启,混合限制(baidu=2QPS,sogou=不限制) +====================================================================== +配置: 32线程, 每域名20请求 + - www.baidu.com: 限制 2 QPS + - www.sogou.com: 不限制 (default=0) + - 模拟处理时间: 10ms + +总耗时: 10.01秒 +调度器统计: {'submitted': 40, 'immediate': 21, 'delayed': 19, 'ready': 40} + +baidu.com: + 处理请求: 20 + 时间跨度: 9.501秒 + 实际QPS: 2.00 + +sogou.com: + 处理请求: 20 + 时间跨度: 0.000秒 + 实际QPS: 334839.39 (不限制,瞬间完成) + +✅ baidu.com QPS控制精确 (配置2, 实际2.00) +``` + +**验证结论**: + +| 指标 | 配置值 | 实际值 | 误差 | +|-----|-------|-------|-----| +| baidu.com QPS | 2 | 2.00 | **0.0%** | +| 时间跨度 | 9.5秒 | 9.50秒 | **0.0%** | + +--- + +### 测试2:混合限制测试(有限制 + 无限制) + +**测试场景**:验证有限制域名被控制,无限制域名不受影响 + +**测试结果**: +``` +====================================================================== +对比分析结果 +====================================================================== + +【1】QPS限制精度验证(baidu.com, 配置2QPS) +-------------------------------------------------- + 配置QPS: 2 + 实际QPS: 2.00 + 误差: 0.0% + ✅ QPS限制精确控制 + +【2】不限制域名性能对比(sogou.com) +-------------------------------------------------- + QPS开启(不限制域名): + 时间跨度: 0.000秒 + 实际QPS: 334839.39 + QPS关闭: + 时间跨度: 0.002秒 + 实际QPS: 249817.48 + + ✅ 两种模式时间跨度都接近0,说明请求几乎瞬间被处理(不限制情况下) +``` + +**验证结论**: +- ✅ 有限制域名(baidu):QPS被精确控制在 2.00 +- ✅ 无限制域名(sogou):不受QPS架构影响,瞬间完成 + +--- + +### 测试3:分布式多进程QPS共享测试 + +**测试场景**:2个进程共享 baidu.com = 2 QPS 配额,使用Redis令牌桶 + +**测试命令**: +```bash +PYTHONPATH=. python tests/qps-scheduler/test_distributed_qps.py +``` + +**测试结果**: +``` +====================================================================== +分布式QPS测试 - 多进程共享QPS配额 +====================================================================== + +配置: + - 进程数: 2 + - 每进程请求数: 10 + - 总请求数: 20 + - 目标共享QPS: 2 + - 预期总耗时: 9.5秒 + +启动 2 个进程... +[进程0] 获取请求 1/10: https://www.baidu.com/process0/page0 +[进程0] 获取请求 2/10: https://www.baidu.com/process0/page1 +... +[进程0] 完成,共处理 10 个请求 +[进程1] 获取请求 1/10: https://www.baidu.com/process1/page0 +... +[进程1] 完成,共处理 10 个请求 + +====================================================================== +测试结果 +====================================================================== + +总耗时: 10.01秒 + +进程0: + 请求数: 10 + 时间跨度: 8.31秒 + 单进程QPS: 1.08 + +进程1: + 请求数: 10 + 时间跨度: 9.32秒 + 单进程QPS: 0.97 + +-------------------------------------------------- +【合并统计 - 所有进程】 +-------------------------------------------------- + 总请求数: 20 + 总时间跨度: 9.34秒 + 实际共享QPS: 2.03 + 配置QPS: 2 + 误差: 1.7% + +✅ 分布式QPS共享控制精确! + 2个进程共享 2 QPS 配额,实际 2.03 QPS + +时间跨度验证: + 预期: 9.5秒 + 实际: 9.3秒 +``` + +**验证结论**: + +| 指标 | 预期值 | 实际值 | 误差 | +|-----|-------|-------|-----| +| 共享QPS | 2 | 2.03 | **1.7%** | +| 总时间跨度 | 9.5秒 | 9.34秒 | **1.7%** | +| 进程0 QPS | ~1 | 1.08 | - | +| 进程1 QPS | ~1 | 0.97 | - | + +- ✅ 两个进程合计QPS精确控制在配置值 +- ✅ Redis令牌桶正确实现跨进程共享 + +--- + +### 测试文件清单 + +| 测试文件 | 说明 | +|---------|------| +| `test_rate_limiter.py` | 令牌桶单元测试(15个用例) | +| `test_qps_scheduler.py` | QPS调度器单元测试(11个用例) | +| `test_mixed_qps_comparison.py` | 混合限制对比测试 | +| `test_distributed_qps.py` | 分布式多进程测试 | +| `test_feapder_integration.py` | AirSpider集成测试 | +| `test_multi_domain_qps.py` | 多域名QPS测试 | +| `test_air_spider_qps.py` | AirSpider QPS测试 | +| `test_performance_only.py` | 性能测试 | +| `test_realistic_performance.py` | 真实场景性能测试 | + +**运行所有测试**: +```bash +PYTHONPATH=. python -m pytest tests/qps-scheduler/ -v +``` + +**测试结果**:38个测试全部通过 + +## 使用示例 + +### AirSpider 使用 + +```python +class MySpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_RULES": { + "www.baidu.com": 2, # 百度限制 2 QPS + "*.taobao.com": 5, # 淘宝全域名限制 5 QPS + }, + "DOMAIN_RATE_LIMIT_DEFAULT": 10, # 其他域名默认 10 QPS + } +``` + +### 分布式爬虫使用 + +```python +class MySpider(Spider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_STORAGE": "redis", # 使用Redis共享配额 + "DOMAIN_RATE_LIMIT_RULES": { + "api.example.com": 10, + }, + } +``` + +## 作者 + +- **ShellMonster** diff --git a/docs/_sidebar.md b/docs/_sidebar.md index bef51b3..ec42095 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -18,6 +18,7 @@ * 使用进阶 * [请求-Request](source_code/Request.md) * [响应-Response](source_code/Response.md) + * [域名级QPS限流](source_code/域名级QPS限流.md) * [代理使用说明](source_code/proxy.md) * [用户池说明](source_code/UserPool.md) * [浏览器渲染-Selenium](source_code/浏览器渲染-Selenium.md) diff --git "a/docs/source_code/\345\237\237\345\220\215\347\272\247QPS\351\231\220\346\265\201.md" "b/docs/source_code/\345\237\237\345\220\215\347\272\247QPS\351\231\220\346\265\201.md" new file mode 100644 index 0000000..f0d35c6 --- /dev/null +++ "b/docs/source_code/\345\237\237\345\220\215\347\272\247QPS\351\231\220\346\265\201.md" @@ -0,0 +1,279 @@ +# 域名级QPS限流 + +域名级QPS限流功能可以针对不同域名配置独立的请求频率限制,防止因请求过快被目标网站封禁,同时支持单机和分布式两种模式。 + +## 1. 功能特点 + +- **域名级别控制**:可为不同域名配置不同的QPS限制 +- **精确控制**:基于令牌桶算法,QPS控制误差 < 2% +- **通配符支持**:支持 `*.example.com` 匹配所有子域名 +- **分布式支持**:多进程/多机器可共享同一QPS配额 +- **零侵入性**:关闭时不影响原有流程和性能 + +## 2. 配置说明 + +在 `setting.py` 或 `__custom_setting__` 中配置: + +```python +# 域名级QPS限流配置 +DOMAIN_RATE_LIMIT_ENABLE = False # 是否启用,默认关闭 +DOMAIN_RATE_LIMIT_DEFAULT = 0 # 默认QPS限制,0表示不限制 +DOMAIN_RATE_LIMIT_RULES = {} # 域名QPS规则 +DOMAIN_RATE_LIMIT_MAX_PREFETCH = 100 # 最大预取请求数 +DOMAIN_RATE_LIMIT_STORAGE = "local" # 存储模式:local/redis +``` + +### 配置项详解 + +| 配置项 | 类型 | 默认值 | 说明 | +|-------|------|-------|------| +| `DOMAIN_RATE_LIMIT_ENABLE` | bool | False | 是否启用QPS限流 | +| `DOMAIN_RATE_LIMIT_DEFAULT` | int | 0 | 默认QPS,0表示不限制 | +| `DOMAIN_RATE_LIMIT_RULES` | dict | {} | 域名QPS规则字典 | +| `DOMAIN_RATE_LIMIT_MAX_PREFETCH` | int | 100 | 最大预取数,防止内存溢出 | +| `DOMAIN_RATE_LIMIT_STORAGE` | str | "local" | 存储模式,local或redis | + +### 存储模式 + +| 模式 | 说明 | 适用场景 | +|-----|------|---------| +| `local` | 本地内存存储 | AirSpider、单进程爬虫 | +| `redis` | Redis分布式存储 | 多进程/多机器部署,需共享QPS配额 | + +## 3. 使用示例 + +### AirSpider 使用 + +```python +import feapder + + +class MySpider(feapder.AirSpider): + __custom_setting__ = dict( + DOMAIN_RATE_LIMIT_ENABLE=True, + DOMAIN_RATE_LIMIT_RULES={ + "www.baidu.com": 2, # 百度限制 2 QPS + "*.taobao.com": 5, # 淘宝全域名限制 5 QPS + }, + DOMAIN_RATE_LIMIT_DEFAULT=10, # 其他域名默认 10 QPS + ) + + def start_requests(self): + yield feapder.Request("https://www.baidu.com/s?wd=test1") + yield feapder.Request("https://www.baidu.com/s?wd=test2") + yield feapder.Request("https://item.taobao.com/item1") + yield feapder.Request("https://detail.taobao.com/item2") + + def parse(self, request, response): + print(f"处理: {request.url}") + + +if __name__ == "__main__": + MySpider(thread_count=10).start() +``` + +上述代码中: +- `www.baidu.com` 的请求频率被限制为每秒2次 +- `*.taobao.com` 匹配 `item.taobao.com`、`detail.taobao.com` 等,限制为每秒5次 +- 其他未匹配的域名,使用默认限制每秒10次 + +### Spider 分布式使用 + +```python +import feapder + + +class MyDistributedSpider(feapder.Spider): + __custom_setting__ = dict( + REDISDB_IP_PORTS="localhost:6379", + REDISDB_USER_PASS="", + REDISDB_DB=0, + + # QPS限流配置 + DOMAIN_RATE_LIMIT_ENABLE=True, + DOMAIN_RATE_LIMIT_STORAGE="redis", # 使用Redis,多进程共享配额 + DOMAIN_RATE_LIMIT_RULES={ + "api.example.com": 10, # API限制 10 QPS + }, + DOMAIN_RATE_LIMIT_DEFAULT=20, + ) + + def start_requests(self): + for i in range(100): + yield feapder.Request(f"https://api.example.com/data/{i}") + + def parse(self, request, response): + print(f"处理: {request.url}") + + +if __name__ == "__main__": + MyDistributedSpider(redis_key="test:spider").start() +``` + +分布式模式下,多个进程共享同一个Redis令牌桶,确保所有进程合计的QPS不超过配置值。 + +### 混合限制示例 + +```python +__custom_setting__ = dict( + DOMAIN_RATE_LIMIT_ENABLE=True, + DOMAIN_RATE_LIMIT_RULES={ + "www.baidu.com": 2, # 精确匹配 + "*.taobao.com": 5, # 通配符匹配 + "api.jd.com": 10, # 精确匹配 + }, + DOMAIN_RATE_LIMIT_DEFAULT=0, # 其他域名不限制 +) +``` + +匹配优先级: +1. **精确匹配**:先检查域名是否完全匹配 +2. **通配符匹配**:检查是否匹配 `*.xxx.com` 模式 +3. **默认值**:使用 `DOMAIN_RATE_LIMIT_DEFAULT` + +## 4. 工作原理 + +### 架构图 + +``` + ┌──────────────────────────────────────┐ + │ QPSScheduler │ + │ │ + Request ───────▶ │ ┌────────────────────────────────┐ │ + │ │ DomainRateLimiter │ │ + │ │ ┌──────────┐ ┌──────────────┐ │ │ + │ │ │ 令牌桶1 │ │ 令牌桶2 │ │ │ + │ │ │(baidu) │ │ (*.taobao) │ │ │ + │ │ └──────────┘ └──────────────┘ │ │ + │ └────────────────────────────────┘ │ + │ │ │ + │ ▼ │ + │ ┌─────────────┐ ┌─────────────┐ │ + │ │ DelayHeap │ │ ReadyQueue │ │ + │ │ (等待队列) │ │ (就绪队列) │ │ + │ └─────────────┘ └─────────────┘ │ + │ │ │ + └──────────────────────────│──────────┘ + ▼ + ParserControl + (消费处理) +``` + +### 令牌桶算法 + +采用令牌桶算法实现精确的QPS控制: + +1. **令牌生成**:按配置的QPS速率持续生成令牌 +2. **令牌消费**:每个请求消费一个令牌 +3. **预扣机制**:请求到达时立即预扣令牌,返回等待时间 +4. **排队等待**:令牌不足时,请求进入延迟队列等待 + +### 分布式模式 + +分布式模式使用Redis + Lua脚本实现: + +``` +进程A ──┐ + ├──▶ Redis令牌桶 ──▶ 统一QPS配额 +进程B ──┘ + +Lua脚本保证操作原子性,避免竞争条件 +``` + +## 5. 支持的爬虫类型 + +| 爬虫类型 | 支持 | 推荐存储模式 | +|---------|-----|-------------| +| AirSpider | ✅ | local | +| Spider | ✅ | redis(多进程时) | +| BatchSpider | ✅ | redis(多进程时) | +| TaskSpider | ✅ | redis(多进程时) | + +## 6. 注意事项 + +1. **QPS=0 表示不限制**:配置为0的域名或默认值为0时,对应请求不受QPS限制 + +2. **多进程必须用Redis模式**:`local` 模式下每个进程有独立的令牌桶,无法共享配额 + +3. **预取数量**:`DOMAIN_RATE_LIMIT_MAX_PREFETCH` 控制调度器预取的请求数,过大会占用内存,过小可能影响性能 + +4. **性能影响**:QPS关闭时(`DOMAIN_RATE_LIMIT_ENABLE=False`),代码流程与原始完全一致,无任何性能损耗 + +5. **通配符匹配**:`*.example.com` 可匹配 `a.example.com`、`b.c.example.com` 等,但不匹配 `example.com` 本身 + +## 7. 完整代码示例 + +### 示例1:基础使用 + +```python +import feapder + + +class BasicQPSSpider(feapder.AirSpider): + __custom_setting__ = dict( + DOMAIN_RATE_LIMIT_ENABLE=True, + DOMAIN_RATE_LIMIT_RULES={ + "httpbin.org": 2, # 限制 2 QPS + }, + ) + + def start_requests(self): + for i in range(10): + yield feapder.Request(f"https://httpbin.org/get?id={i}") + + def parse(self, request, response): + print(f"状态码: {response.status_code}, URL: {request.url}") + + +if __name__ == "__main__": + BasicQPSSpider(thread_count=10).start() +``` + +### 示例2:分布式多进程 + +```python +import feapder + + +class DistributedQPSSpider(feapder.Spider): + __custom_setting__ = dict( + REDISDB_IP_PORTS="localhost:6379", + REDISDB_USER_PASS="", + REDISDB_DB=0, + + DOMAIN_RATE_LIMIT_ENABLE=True, + DOMAIN_RATE_LIMIT_STORAGE="redis", + DOMAIN_RATE_LIMIT_RULES={ + "api.example.com": 5, # 多进程合计 5 QPS + }, + ) + + def start_requests(self): + for i in range(50): + yield feapder.Request(f"https://api.example.com/item/{i}") + + def parse(self, request, response): + print(f"处理: {request.url}") + + +if __name__ == "__main__": + # 可启动多个进程,共享 5 QPS 配额 + DistributedQPSSpider(redis_key="qps:spider").start() +``` + +## 8. 常见问题 + +### Q: QPS设置了但没生效? + +A: 检查以下几点: +1. `DOMAIN_RATE_LIMIT_ENABLE` 是否为 `True` +2. 域名规则是否正确匹配(注意 `www.baidu.com` 和 `baidu.com` 是不同的) +3. 分布式模式下是否配置了 `DOMAIN_RATE_LIMIT_STORAGE="redis"` + +### Q: 多进程QPS不准确? + +A: 确保使用 `redis` 存储模式,`local` 模式下各进程独立计算,无法共享配额。 + +### Q: 如何关闭某个域名的限制? + +A: 将该域名的QPS设置为0,或不在规则中配置该域名且 `DOMAIN_RATE_LIMIT_DEFAULT=0`。 diff --git a/feapder/core/parser_control.py b/feapder/core/parser_control.py index 021d295..b650c3d 100644 --- a/feapder/core/parser_control.py +++ b/feapder/core/parser_control.py @@ -40,13 +40,14 @@ class ParserControl(threading.Thread): _hook_parsers = set() - def __init__(self, collector, redis_key, request_buffer, item_buffer): + def __init__(self, collector, redis_key, request_buffer, item_buffer, qps_scheduler=None): super(ParserControl, self).__init__() self._parsers = [] self._collector = collector self._redis_key = redis_key self._request_buffer = request_buffer self._item_buffer = item_buffer + self._qps_scheduler = qps_scheduler self._thread_stop = False @@ -54,7 +55,7 @@ def run(self): self._thread_stop = False while not self._thread_stop: try: - request = self._collector.get_request() + request = self._get_request() if not request: if not self.is_show_tip: log.debug("等待任务...") @@ -67,6 +68,49 @@ def run(self): except Exception as e: log.exception(e) + def _get_request(self): + """ + @summary: 获取请求 + 如果启用了QPS限流,流程为: + 1. 优先从就绪队列获取请求(避免死锁) + 2. 就绪队列为空时,从Collector取请求提交给调度器 + 3. 再次尝试从就绪队列获取请求 + 如果未启用QPS限流,直接从Collector获取 + --------- + @result: 请求字典,包含request_obj和request_redis,队列为空返回None + """ + if self._qps_scheduler: + # QPS限流模式 + # 步骤1:优先从就绪队列获取请求(非阻塞,避免死锁) + request = self._qps_scheduler.get_ready_request_nowait() + if request: + return request + + # 步骤2:就绪队列为空,从Collector获取请求并提交给调度器 + # Collector.get_request() 返回的是 dict: {"request_obj": Request, "request_redis": str} + batch_size = max(1, self._qps_scheduler.max_prefetch // 10) + for _ in range(batch_size): + request_dict = self._collector.get_request() + if request_dict: + # 非阻塞提交,避免背压导致死锁 + self._qps_scheduler.submit(request_dict, block=False) + else: + break + + # 步骤3:从调度器获取已就绪的请求(带超时) + request = self._qps_scheduler.get_ready_request(timeout=1.0) + + # 超时返回None时输出调试日志 + if request is None and not self._qps_scheduler.is_empty(): + log.debug( + f"QPS: 等待就绪请求超时,调度器中仍有 {self._qps_scheduler.pending_count()} 个请求在等待令牌" + ) + + return request + else: + # 原始模式:直接从Collector获取 + return self._collector.get_request() + def is_not_task(self): return self.is_show_tip @@ -465,6 +509,7 @@ def __init__( memory_db: MemoryDB, request_buffer: AirSpiderRequestBuffer, item_buffer: ItemBuffer, + qps_scheduler=None, ): super(ParserControl, self).__init__() self._parsers = [] @@ -472,11 +517,12 @@ def __init__( self._thread_stop = False self._request_buffer = request_buffer self._item_buffer = item_buffer + self._qps_scheduler = qps_scheduler def run(self): while not self._thread_stop: try: - request = self._memory_db.get() + request = self._get_request() if not request: if not self.is_show_tip: log.debug("等待任务...") @@ -489,6 +535,49 @@ def run(self): except Exception as e: log.exception(e) + def _get_request(self): + """ + @summary: 获取请求 + 如果启用了QPS限流,流程为: + 1. 优先从就绪队列获取请求(避免死锁) + 2. 就绪队列为空时,从MemoryDB取请求提交给调度器 + 3. 再次尝试从就绪队列获取请求 + 如果未启用QPS限流,直接从MemoryDB获取 + --------- + @result: 请求对象Request,队列为空返回None + """ + if self._qps_scheduler: + # QPS限流模式 + # 步骤1:优先从就绪队列获取请求(非阻塞,避免死锁) + request = self._qps_scheduler.get_ready_request_nowait() + if request: + return request + + # 步骤2:就绪队列为空,从MemoryDB获取请求并提交给调度器 + # 批量大小与max_prefetch相关,避免过多请求堆积在调度器中 + batch_size = max(1, self._qps_scheduler.max_prefetch // 10) + for _ in range(batch_size): + pending_request = self._memory_db.get_nowait() + if pending_request: + # 非阻塞提交,避免背压导致死锁 + self._qps_scheduler.submit(pending_request, block=False) + else: + break + + # 步骤3:从调度器获取已就绪的请求(带超时) + request = self._qps_scheduler.get_ready_request(timeout=1.0) + + # 超时返回None时输出调试日志 + if request is None and not self._qps_scheduler.is_empty(): + log.debug( + f"QPS: 等待就绪请求超时,调度器中仍有 {self._qps_scheduler.pending_count()} 个请求在等待令牌" + ) + + return request + else: + # 原始模式:直接从MemoryDB获取 + return self._memory_db.get() + def deal_request(self, request): response = None diff --git a/feapder/core/scheduler.py b/feapder/core/scheduler.py index 0177d18..852d993 100644 --- a/feapder/core/scheduler.py +++ b/feapder/core/scheduler.py @@ -20,11 +20,13 @@ from feapder.core.handle_failed_items import HandleFailedItems from feapder.core.handle_failed_requests import HandleFailedRequests from feapder.core.parser_control import ParserControl +from feapder.core.schedulers import QPSScheduler from feapder.db.redisdb import RedisDB from feapder.network.item import Item from feapder.network.request import Request from feapder.utils import metrics from feapder.utils.log import log +from feapder.utils.rate_limiter import DomainRateLimiter from feapder.utils.redis_lock import RedisLock from feapder.utils.tail_thread import TailThread @@ -154,6 +156,22 @@ def __init__( self._stop_spider = False + # 初始化QPS调度器(如果启用) + self._qps_scheduler = None + if setting.DOMAIN_RATE_LIMIT_ENABLE: + # 分布式爬虫使用Redis令牌桶实现跨进程QPS控制 + storage = "redis" if setting.DOMAIN_RATE_LIMIT_STORAGE == "redis" else "local" + rate_limiter = DomainRateLimiter( + rules=setting.DOMAIN_RATE_LIMIT_RULES, + default_qps=setting.DOMAIN_RATE_LIMIT_DEFAULT, + storage=storage, + redis_client=self._redisdb.get_redis_obj() if storage == "redis" else None + ) + self._qps_scheduler = QPSScheduler( + rate_limiter=rate_limiter, + max_prefetch=setting.DOMAIN_RATE_LIMIT_MAX_PREFETCH + ) + def init_metrics(self): """ 初始化打点系统 @@ -254,6 +272,10 @@ def _start(self): # 启动collector self._collector.start() + # 启动QPS调度器(如果启用) + if self._qps_scheduler: + self._qps_scheduler.start() + # 启动parser control for i in range(self._thread_count): parser_control = self._parser_control_obj( @@ -261,6 +283,7 @@ def _start(self): self._redis_key, self._request_buffer, self._item_buffer, + qps_scheduler=self._qps_scheduler, # 传递QPS调度器 ) for parser in self._parsers: @@ -300,6 +323,10 @@ def all_thread_is_done(self): if not parser_control.is_not_task(): return False + # 检测 QPS调度器状态(如果启用) + if self._qps_scheduler and not self._qps_scheduler.is_empty(): + return False + # 检测 item_buffer 状态 if ( self._item_buffer.get_items_count() > 0 @@ -426,6 +453,9 @@ def _stop_all_thread(self): # 停止 parser_controls for parser_control in self._parser_controls: parser_control.stop() + # 停止 QPS调度器(如果启用) + if self._qps_scheduler: + self._qps_scheduler.stop() self.heartbeat_stop() self._started.clear() diff --git a/feapder/core/schedulers/__init__.py b/feapder/core/schedulers/__init__.py new file mode 100644 index 0000000..7fd5f8a --- /dev/null +++ b/feapder/core/schedulers/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" +Created on 2024 +--------- +@summary: QPS调度模块,提供域名级QPS限流调度功能 +--------- +@author: ShellMonster +""" + +from feapder.core.schedulers.qps_scheduler import QPSScheduler + +__all__ = ["QPSScheduler"] diff --git a/feapder/core/schedulers/qps_scheduler.py b/feapder/core/schedulers/qps_scheduler.py new file mode 100644 index 0000000..055fb1d --- /dev/null +++ b/feapder/core/schedulers/qps_scheduler.py @@ -0,0 +1,378 @@ +# -*- coding: utf-8 -*- +""" +Created on 2024 +--------- +@summary: QPS调度器模块,基于令牌桶算法的域名级QPS限流调度器 +--------- +@author: ShellMonster +""" + +import heapq +import threading +import time +from collections import defaultdict +from queue import Queue, Empty +from typing import Dict, Optional, Any + +from feapder.utils.rate_limiter import DomainRateLimiter +from feapder.utils.log import log + + +class DelayedRequest: + """ + 延迟请求包装类,用于在延迟堆中存储等待执行的请求,支持按调度时间排序 + """ + + __slots__ = ('scheduled_time', 'request', 'domain') + + def __init__(self, scheduled_time: float, request: Any, domain: str): + """ + @summary: 初始化延迟请求 + --------- + @param scheduled_time: 计划执行时间(Unix时间戳) + @param request: 原始请求对象 + @param domain: 请求的域名 + --------- + @result: + """ + self.scheduled_time = scheduled_time + self.request = request + self.domain = domain + + def __lt__(self, other: 'DelayedRequest') -> bool: + """按调度时间排序(最小堆)""" + return self.scheduled_time < other.scheduled_time + + def __repr__(self) -> str: + return f"DelayedRequest(scheduled={self.scheduled_time:.3f}, domain={self.domain})" + + +class QPSScheduler: + """ + QPS调度器,单线程调度器,负责管理请求的QPS限流 + 采用生产者-消费者模式,单线程处理避免令牌桶的并发竞争 + """ + + def __init__( + self, + rate_limiter: DomainRateLimiter, + max_prefetch: int = 100 + ): + """ + @summary: 初始化QPS调度器 + --------- + @param rate_limiter: 域名级限流器实例 + @param max_prefetch: 每个域名最大预取请求数,超过后阻塞提交(默认100) + --------- + @result: + """ + self.rate_limiter = rate_limiter + self.max_prefetch = max_prefetch + + # 提交队列:外部提交的请求先进入这里 + self._submit_queue: Queue = Queue() + + # 就绪队列:已获取令牌的请求 + self._ready_queue: Queue = Queue() + + # 延迟堆:等待令牌的请求(最小堆,按scheduled_time排序) + self._delay_heap: list = [] + self._heap_lock = threading.Lock() + + # 每个域名当前在调度器中的请求数(提交队列 + 延迟堆 + 就绪队列) + self._domain_pending_count: Dict[str, int] = defaultdict(int) + self._count_lock = threading.Lock() + + # 调度线程控制 + self._running = False + self._scheduler_thread: Optional[threading.Thread] = None + + # 统计信息 + self._stats = { + 'submitted': 0, # 提交的请求总数 + 'immediate': 0, # 立即获得令牌的请求数 + 'delayed': 0, # 需要延迟的请求数 + 'ready': 0, # 已就绪的请求数 + } + + # 状态日志控制 + self._last_status_log_time = 0 + self._status_log_interval = 10 # 每10秒输出一次状态 + + def start(self) -> None: + """ + @summary: 启动调度器,启动后台调度线程开始处理请求的QPS限流 + --------- + @result: + """ + if self._running: + return + + self._running = True + self._scheduler_thread = threading.Thread( + target=self._scheduler_loop, + name="QPSScheduler", + daemon=True + ) + self._scheduler_thread.start() + log.debug("QPSScheduler started") + + def stop(self) -> None: + """ + @summary: 停止调度器,优雅关闭调度线程 + --------- + @result: + """ + if not self._running: + return + + self._running = False + if self._scheduler_thread and self._scheduler_thread.is_alive(): + self._scheduler_thread.join(timeout=5) + log.debug("QPSScheduler stopped") + + def submit(self, request: Any, block: bool = True, timeout: float = None) -> bool: + """ + @summary: 提交请求到调度器 + 请求会先进入提交队列,由调度线程处理QPS控制后放入就绪队列 + --------- + @param request: 请求对象(需有url属性)或字典(分布式爬虫格式:{"request_obj": Request, "request_redis": str}) + @param block: 当域名预取数达到上限时是否阻塞等待 + @param timeout: 阻塞超时时间(秒),None表示无限等待 + --------- + @result: 提交成功返回True,超时返回False + """ + # 提取域名,支持两种格式: + # 1. AirSpider: request 是 Request 对象,有 url 属性 + # 2. Spider/BatchSpider/TaskSpider: request 是字典 {"request_obj": Request, "request_redis": str} + if isinstance(request, dict) and 'request_obj' in request: + url = getattr(request['request_obj'], 'url', '') or '' + else: + url = getattr(request, 'url', '') or '' + domain = DomainRateLimiter.extract_domain(url) + + # 检查是否需要背压 + if self.max_prefetch > 0: + start_time = time.time() + logged_backpressure = False + while True: + with self._count_lock: + if self._domain_pending_count[domain] < self.max_prefetch: + self._domain_pending_count[domain] += 1 + break + + if not block: + return False + + if timeout is not None and (time.time() - start_time) >= timeout: + return False + + # 背压阻塞日志(只输出一次,避免刷屏) + if not logged_backpressure: + log.debug( + f"QPS背压: 域名 {domain} 待处理数达到上限 {self.max_prefetch},等待释放..." + ) + logged_backpressure = True + + # 等待一小段时间后重试 + time.sleep(0.01) + else: + with self._count_lock: + self._domain_pending_count[domain] += 1 + + # 放入提交队列 + self._submit_queue.put((request, domain)) + self._stats['submitted'] += 1 + return True + + def get_ready_request(self, timeout: float = 1.0) -> Optional[Any]: + """ + @summary: 获取一个就绪的请求,从就绪队列获取已经获得令牌的请求 + --------- + @param timeout: 等待超时时间(秒) + --------- + @result: 请求对象,超时返回None + """ + try: + request, domain = self._ready_queue.get(timeout=timeout) + # 减少域名计数 + with self._count_lock: + self._domain_pending_count[domain] = max( + 0, self._domain_pending_count[domain] - 1 + ) + return request + except Empty: + return None + + def get_ready_request_nowait(self) -> Optional[Any]: + """ + @summary: 非阻塞获取就绪请求 + --------- + @result: 请求对象,队列为空返回None + """ + try: + request, domain = self._ready_queue.get_nowait() + with self._count_lock: + self._domain_pending_count[domain] = max( + 0, self._domain_pending_count[domain] - 1 + ) + return request + except Empty: + return None + + def _scheduler_loop(self) -> None: + """ + @summary: 调度器主循环 + 持续处理提交队列中的新请求,检查延迟堆中是否有到期的请求 + --------- + @result: + """ + while self._running: + try: + # 处理提交队列(非阻塞,处理所有待处理的) + processed = self._process_submit_queue() + + # 处理延迟堆(将到期的请求移入就绪队列) + moved = self._process_delay_heap() + + # 定期输出状态日志(DEBUG模式下) + self._log_status_if_needed() + + # 如果没有任何工作,短暂休眠避免CPU空转 + if not processed and not moved: + time.sleep(0.001) # 1ms + + except Exception as e: + log.error(f"QPSScheduler error: {e}") + time.sleep(0.01) + + def _process_submit_queue(self) -> int: + """ + @summary: 处理提交队列 + 从提交队列取出请求,立即获得令牌则放入就绪队列,否则放入延迟堆 + --------- + @result: 处理的请求数量 + """ + count = 0 + while True: + try: + request, domain = self._submit_queue.get_nowait() + except Empty: + break + + count += 1 + + # 获取令牌 + wait_time = self.rate_limiter.acquire(domain) + + if wait_time <= 0: + # 立即可执行,放入就绪队列 + self._ready_queue.put((request, domain)) + self._stats['immediate'] += 1 + self._stats['ready'] += 1 + else: + # 需要等待,放入延迟堆 + scheduled_time = time.time() + wait_time + delayed = DelayedRequest(scheduled_time, request, domain) + with self._heap_lock: + heapq.heappush(self._delay_heap, delayed) + self._stats['delayed'] += 1 + + return count + + def _process_delay_heap(self) -> int: + """ + @summary: 处理延迟堆,检查是否有到期的请求,将到期的移入就绪队列 + --------- + @result: 移动的请求数量 + """ + count = 0 + now = time.time() + + with self._heap_lock: + while self._delay_heap: + # 查看堆顶(最早到期的) + top = self._delay_heap[0] + if top.scheduled_time > now: + # 还没到期,停止处理 + break + + # 到期了,弹出并放入就绪队列 + heapq.heappop(self._delay_heap) + self._ready_queue.put((top.request, top.domain)) + self._stats['ready'] += 1 + count += 1 + + return count + + def _log_status_if_needed(self) -> None: + """ + @summary: 定期输出调度器状态日志(DEBUG模式下) + --------- + @result: + """ + now = time.time() + if now - self._last_status_log_time >= self._status_log_interval: + self._last_status_log_time = now + + with self._heap_lock: + delay_heap_size = len(self._delay_heap) + + submit_queue_size = self._submit_queue.qsize() + ready_queue_size = self._ready_queue.qsize() + + # 只有当有请求在处理时才输出日志 + if delay_heap_size > 0 or submit_queue_size > 0 or ready_queue_size > 0: + log.debug( + f"QPS调度器状态: 提交队列={submit_queue_size}, " + f"延迟堆={delay_heap_size}, 就绪队列={ready_queue_size}, " + f"统计={self._stats}" + ) + + def is_empty(self) -> bool: + """ + @summary: 检查调度器是否为空 + 当提交队列、延迟堆、就绪队列都为空时返回True + --------- + @result: 是否为空 + """ + with self._heap_lock: + heap_empty = len(self._delay_heap) == 0 + + return ( + self._submit_queue.empty() and + heap_empty and + self._ready_queue.empty() + ) + + def pending_count(self, domain: str = None) -> int: + """ + @summary: 获取待处理请求数 + --------- + @param domain: 指定域名,None表示所有域名 + --------- + @result: 待处理请求数 + """ + with self._count_lock: + if domain: + return self._domain_pending_count.get(domain, 0) + return sum(self._domain_pending_count.values()) + + def get_stats(self) -> Dict[str, int]: + """ + @summary: 获取统计信息 + --------- + @result: 统计数据字典,包含submitted/immediate/delayed/ready等计数 + """ + return dict(self._stats) + + def __repr__(self) -> str: + with self._heap_lock: + heap_size = len(self._delay_heap) + return ( + f"QPSScheduler(" + f"submit_queue={self._submit_queue.qsize()}, " + f"delay_heap={heap_size}, " + f"ready_queue={self._ready_queue.qsize()}, " + f"running={self._running})" + ) diff --git a/feapder/core/spiders/air_spider.py b/feapder/core/spiders/air_spider.py index 70c3011..80e3f2d 100644 --- a/feapder/core/spiders/air_spider.py +++ b/feapder/core/spiders/air_spider.py @@ -19,6 +19,8 @@ from feapder.utils import metrics from feapder.utils.log import log from feapder.utils.tail_thread import TailThread +from feapder.utils.rate_limiter import DomainRateLimiter +from feapder.core.schedulers import QPSScheduler class AirSpider(BaseParser, TailThread): @@ -48,6 +50,19 @@ def __init__(self, thread_count=None): self._stop_spider = False metrics.init(**setting.METRICS_OTHER_ARGS) + # 初始化QPS调度器(如果启用) + self._qps_scheduler = None + if setting.DOMAIN_RATE_LIMIT_ENABLE: + rate_limiter = DomainRateLimiter( + rules=setting.DOMAIN_RATE_LIMIT_RULES, + default_qps=setting.DOMAIN_RATE_LIMIT_DEFAULT, + storage="local" # AirSpider使用本地内存模式 + ) + self._qps_scheduler = QPSScheduler( + rate_limiter=rate_limiter, + max_prefetch=setting.DOMAIN_RATE_LIMIT_MAX_PREFETCH + ) + def distribute_task(self): for request in self.start_requests(): if not isinstance(request, Request): @@ -67,6 +82,10 @@ def all_thread_is_done(self): if not self._memory_db.empty(): return False + # 检测 QPS调度器状态(如果启用) + if self._qps_scheduler and not self._qps_scheduler.is_empty(): + return False + # 检测 item_buffer 状态 if ( self._item_buffer.get_items_count() > 0 @@ -81,11 +100,17 @@ def all_thread_is_done(self): def run(self): self.start_callback() + # 启动QPS调度器(如果启用) + if self._qps_scheduler: + self._qps_scheduler.start() + log.info("域名级QPS限流已启用") + for i in range(self._thread_count): parser_control = AirSpiderParserControl( memory_db=self._memory_db, request_buffer=self._request_buffer, item_buffer=self._item_buffer, + qps_scheduler=self._qps_scheduler, # 传递QPS调度器 ) parser_control.add_parser(self) parser_control.start() @@ -98,6 +123,10 @@ def run(self): while True: try: if self._stop_spider or self.all_thread_is_done(): + # 停止QPS调度器(如果启用) + if self._qps_scheduler: + self._qps_scheduler.stop() + # 停止 parser_controls for parser_control in self._parser_controls: parser_control.stop() diff --git a/feapder/db/memorydb.py b/feapder/db/memorydb.py index 99c8c7d..8bd81e5 100644 --- a/feapder/db/memorydb.py +++ b/feapder/db/memorydb.py @@ -29,16 +29,32 @@ def add(self, item, ignore_max_size=False): else: self.priority_queue.put(item) - def get(self): + def get(self, timeout: float = 1): """ - 获取任务 - :return: + 获取任务(阻塞模式) + :param timeout: 超时时间(秒),默认1秒 + :return: 任务对象,超时返回None """ try: - item = self.priority_queue.get(timeout=1) + item = self.priority_queue.get(timeout=timeout) return item except: return + def get_nowait(self): + """ + 获取任务(非阻塞模式) + + 立即返回,队列为空时返回None,不会阻塞等待。 + 用于QPS调度器快速获取请求。 + + :return: 任务对象,队列为空返回None + """ + try: + item = self.priority_queue.get_nowait() + return item + except: + return None + def empty(self): return self.priority_queue.empty() diff --git a/feapder/setting.py b/feapder/setting.py index 985709b..9971c30 100644 --- a/feapder/setting.py +++ b/feapder/setting.py @@ -120,6 +120,21 @@ # 内存任务队列最大缓存的任务数,默认不限制;仅对AirSpider有效。 TASK_MAX_CACHED_SIZE = 0 +# 域名级QPS限流配置 +# 是否启用域名级QPS限流 +DOMAIN_RATE_LIMIT_ENABLE = False +# 默认QPS限制,0表示不限制 +DOMAIN_RATE_LIMIT_DEFAULT = 0 +# 域名QPS规则,格式:{"域名或通配符": QPS值} +# 示例:{"www.baidu.com": 1, "*.taobao.com": 5} +# 支持精确匹配和通配符匹配(*.example.com 匹配所有子域名) +DOMAIN_RATE_LIMIT_RULES = {} +# 每个域名最大预取请求数,防止内存溢出,0表示不限制 +DOMAIN_RATE_LIMIT_MAX_PREFETCH = 100 +# 令牌桶存储模式:local-本地内存(单进程),redis-Redis分布式(多进程共享QPS配额) +# AirSpider默认使用local,Spider/BatchSpider/TaskSpider建议使用redis +DOMAIN_RATE_LIMIT_STORAGE = "local" + # 下载缓存 利用redis缓存,但由于内存大小限制,所以建议仅供开发调试代码时使用,防止每次debug都需要网络请求 RESPONSE_CACHED_ENABLE = False # 是否启用下载缓存 成本高的数据或容易变需求的数据,建议设置为True RESPONSE_CACHED_EXPIRE_TIME = 3600 # 缓存时间 秒 diff --git a/feapder/templates/project_template/setting.py b/feapder/templates/project_template/setting.py index e09506b..9671141 100644 --- a/feapder/templates/project_template/setting.py +++ b/feapder/templates/project_template/setting.py @@ -113,6 +113,21 @@ # ITEM_UPLOAD_INTERVAL = 1 # # 内存任务队列最大缓存的任务数,默认不限制;仅对AirSpider有效。 # TASK_MAX_CACHED_SIZE = 0 + +# # 域名级QPS限流配置 +# # 是否启用域名级QPS限流 +# DOMAIN_RATE_LIMIT_ENABLE = False +# # 默认QPS限制,0表示不限制 +# DOMAIN_RATE_LIMIT_DEFAULT = 0 +# # 域名QPS规则,格式:{"域名或通配符": QPS值} +# # 示例:{"www.baidu.com": 1, "*.taobao.com": 5} +# # 支持精确匹配和通配符匹配(*.example.com 匹配所有子域名) +# DOMAIN_RATE_LIMIT_RULES = {} +# # 每个域名最大预取请求数,防止内存溢出,0表示不限制 +# DOMAIN_RATE_LIMIT_MAX_PREFETCH = 100 +# # 令牌桶存储模式:local-本地内存(单进程),redis-Redis分布式(多进程共享QPS配额) +# # AirSpider默认使用local,Spider/BatchSpider/TaskSpider建议使用redis +# DOMAIN_RATE_LIMIT_STORAGE = "local" # # # 下载缓存 利用redis缓存,但由于内存大小限制,所以建议仅供开发调试代码时使用,防止每次debug都需要网络请求 # RESPONSE_CACHED_ENABLE = False # 是否启用下载缓存 成本高的数据或容易变需求的数据,建议设置为True diff --git a/feapder/utils/rate_limiter.py b/feapder/utils/rate_limiter.py new file mode 100644 index 0000000..acc2fd6 --- /dev/null +++ b/feapder/utils/rate_limiter.py @@ -0,0 +1,282 @@ +# -*- coding: utf-8 -*- +""" +Created on 2024 +--------- +@summary: 域名级QPS限流模块,提供基于令牌桶算法的域名级别请求频率控制 +--------- +@author: ShellMonster +""" +import time +import threading +from typing import Dict, Optional +from urllib.parse import urlparse + + +class LocalTokenBucket: + """ + 本地内存令牌桶,基于令牌桶算法实现的单机限流器 + 特点:线程安全、令牌按时间自动补充、支持预扣模式 + """ + + def __init__(self, qps: int): + """ + @summary: 初始化本地令牌桶 + --------- + @param qps: 每秒允许的请求数(同时也是桶容量) + --------- + @result: + """ + if qps <= 0: + raise ValueError("qps must be positive") + + self.capacity = qps # 桶容量(最大令牌数) + self.tokens = 1.0 # 当前令牌数(初始1个,避免突发) + self.qps = qps # 每秒生成的令牌数 + self.last_update = time.time() + self.lock = threading.Lock() + + def acquire(self) -> float: + """ + @summary: 获取一个令牌 + --------- + @result: 0 表示立即获得令牌,>0 表示需要等待的秒数(令牌已预扣) + """ + with self.lock: + now = time.time() + # 防止系统时钟回拨导致的问题 + elapsed = max(0, now - self.last_update) + + # 根据时间流逝补充令牌(不超过桶容量) + self.tokens = min(self.capacity, self.tokens + elapsed * self.qps) + self.last_update = now + + if self.tokens >= 1: + # 有令牌可用,消费一个 + self.tokens -= 1 + return 0 + else: + # 无令牌,计算需要等待的时间 + # 等待时间 = (需要的令牌数 - 当前令牌数) / 每秒生成速率 + wait_time = (1 - self.tokens) / self.qps + # 预扣令牌:tokens减1(可为负数),确保后续请求排队等待 + self.tokens -= 1 + return wait_time + + def __repr__(self) -> str: + return f"LocalTokenBucket(qps={self.qps}, tokens={self.tokens:.2f})" + + +class RedisTokenBucket: + """ + Redis分布式令牌桶,基于Redis + Lua脚本实现的分布式限流器 + 特点:多进程/多机器共享QPS配额、Lua脚本保证操作原子性、自动过期清理 + """ + + # Lua脚本:原子性地检查并消费令牌 + ACQUIRE_SCRIPT = """ + local key = KEYS[1] + local capacity = tonumber(ARGV[1]) + local qps = tonumber(ARGV[2]) + local now = tonumber(ARGV[3]) + + -- 获取当前状态 + local tokens = tonumber(redis.call('HGET', key, 'tokens')) + local last_update = tonumber(redis.call('HGET', key, 'last_update')) + + -- 初始化(首次访问,初始1个令牌避免突发) + if not tokens then + tokens = 1 + end + if not last_update then + last_update = now + end + + -- 计算应补充的令牌数(防止时钟回拨) + local elapsed = math.max(0, now - last_update) + tokens = math.min(capacity, tokens + elapsed * qps) + + -- 尝试消费令牌 + if tokens >= 1 then + tokens = tokens - 1 + redis.call('HSET', key, 'tokens', tostring(tokens)) + redis.call('HSET', key, 'last_update', tostring(now)) + redis.call('EXPIRE', key, 3600) -- 1小时无访问自动清理 + return 0 -- 成功获取 + else + -- 计算等待时间并预扣 + local wait_time = (1 - tokens) / qps + tokens = tokens - 1 -- 预扣令牌(可为负数),确保后续请求排队 + redis.call('HSET', key, 'tokens', tostring(tokens)) + redis.call('HSET', key, 'last_update', tostring(now)) + redis.call('EXPIRE', key, 3600) + return wait_time + end + """ + + def __init__(self, redis_client, key: str, qps: int): + """ + @summary: 初始化Redis分布式令牌桶 + --------- + @param redis_client: Redis客户端实例 + @param key: Redis中存储令牌桶状态的key + @param qps: 每秒允许的请求数 + --------- + @result: + """ + if qps <= 0: + raise ValueError("qps must be positive") + + self.redis = redis_client + self.key = key + self.qps = qps + self.capacity = qps + self._script = None # 延迟注册脚本 + + def acquire(self) -> float: + """ + @summary: 获取一个令牌(分布式版本) + --------- + @result: 0 表示立即获得令牌,>0 表示需要等待的秒数 + """ + # 延迟注册Lua脚本(首次调用时) + if self._script is None: + self._script = self.redis.register_script(self.ACQUIRE_SCRIPT) + + result = self._script( + keys=[self.key], + args=[self.capacity, self.qps, time.time()] + ) + return float(result) + + def __repr__(self) -> str: + return f"RedisTokenBucket(key={self.key}, qps={self.qps})" + + +class DomainRateLimiter: + """ + 域名级限流管理器,统一管理多个域名的QPS限制 + 支持精确域名匹配、通配符匹配、默认QPS、自动选择本地/Redis模式 + """ + + def __init__( + self, + rules: Optional[Dict[str, int]] = None, + default_qps: int = 0, + storage: str = "local", + redis_client=None + ): + """ + @summary: 初始化域名级限流管理器 + --------- + @param rules: 域名QPS规则字典,格式 {"域名或通配符": QPS值} + @param default_qps: 默认QPS限制,0表示不限制 + @param storage: 存储模式,"local"(本地内存)或 "redis"(分布式) + @param redis_client: Redis客户端实例(storage="redis"时必须提供) + --------- + @result: + """ + self.rules = rules or {} + self.default_qps = default_qps + self.storage = storage + self.redis_client = redis_client + + # 令牌桶缓存(按域名+QPS组合) + self._local_buckets: Dict[str, LocalTokenBucket] = {} + self._redis_buckets: Dict[str, RedisTokenBucket] = {} + + # 验证参数 + if storage == "redis" and redis_client is None: + raise ValueError("redis_client is required when storage='redis'") + + def get_qps_limit(self, domain: str) -> int: + """ + @summary: 获取指定域名的QPS限制 + 匹配优先级:1.精确匹配 2.通配符匹配 3.默认值 + --------- + @param domain: 域名(如 "www.baidu.com") + --------- + @result: QPS限制值,0表示不限制 + """ + if not domain: + return self.default_qps + + # 1. 精确匹配 + if domain in self.rules: + return self.rules[domain] + + # 2. 通配符匹配(*.example.com) + for pattern, qps in self.rules.items(): + if pattern.startswith("*."): + suffix = pattern[2:] # 去掉 "*." + # 匹配 "sub.example.com" 或 "example.com" 本身 + if domain.endswith("." + suffix) or domain == suffix: + return qps + + # 3. 默认值 + return self.default_qps + + def acquire(self, domain: str) -> float: + """ + @summary: 获取指定域名的一个令牌 + --------- + @param domain: 域名 + --------- + @result: 0 表示立即可执行,>0 表示需要等待的秒数 + """ + qps_limit = self.get_qps_limit(domain) + + # QPS <= 0 表示不限制 + if qps_limit <= 0: + return 0 + + # 根据存储模式选择令牌桶 + if self.storage == "redis" and self.redis_client: + return self._acquire_redis(domain, qps_limit) + else: + return self._acquire_local(domain, qps_limit) + + def _acquire_local(self, domain: str, qps_limit: int) -> float: + """从本地令牌桶获取令牌""" + # 使用 "域名:QPS" 作为key,支持同一域名不同QPS配置 + cache_key = f"{domain}:{qps_limit}" + + if cache_key not in self._local_buckets: + self._local_buckets[cache_key] = LocalTokenBucket(qps_limit) + + return self._local_buckets[cache_key].acquire() + + def _acquire_redis(self, domain: str, qps_limit: int) -> float: + """从Redis令牌桶获取令牌""" + # Redis key格式:feapder:rate_limit:域名:QPS + redis_key = f"feapder:rate_limit:{domain}:{qps_limit}" + + if redis_key not in self._redis_buckets: + self._redis_buckets[redis_key] = RedisTokenBucket( + self.redis_client, redis_key, qps_limit + ) + + return self._redis_buckets[redis_key].acquire() + + @staticmethod + def extract_domain(url: str) -> str: + """ + @summary: 从URL中提取域名 + --------- + @param url: 完整URL(如 "https://www.baidu.com/path?query=1") + --------- + @result: 域名(如 "www.baidu.com"),提取失败返回空字符串 + """ + if not url: + return "" + + try: + parsed = urlparse(url) + return parsed.hostname or "" + except Exception: + return "" + + def __repr__(self) -> str: + return ( + f"DomainRateLimiter(rules={self.rules}, " + f"default_qps={self.default_qps}, storage={self.storage})" + ) diff --git a/tests/qps-scheduler/test_air_spider_qps.py b/tests/qps-scheduler/test_air_spider_qps.py new file mode 100644 index 0000000..dec0c37 --- /dev/null +++ b/tests/qps-scheduler/test_air_spider_qps.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +""" +AirSpider QPS限流集成测试 + +测试AirSpider与QPS调度器的集成效果。 + +Author: ShellMonster +Created: 2024 +""" + +import time +import threading +import unittest + +import feapder +import feapder.setting as setting +from feapder import AirSpider, Request + + +class QPSTestSpider(AirSpider): + """测试用爬虫""" + + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_DEFAULT": 5, # 5 QPS + "DOMAIN_RATE_LIMIT_RULES": { + "httpbin.org": 2, # 2 QPS for httpbin + }, + "DOMAIN_RATE_LIMIT_MAX_PREFETCH": 50, + "SPIDER_THREAD_COUNT": 3, + } + + def __init__(self): + super().__init__() + self.request_times = [] + self.request_lock = threading.Lock() + + def start_requests(self): + # 生成10个请求 + for i in range(10): + yield Request(f"https://httpbin.org/get?id={i}") + + def parse(self, request, response): + with self.request_lock: + self.request_times.append(time.time()) + + +class TestAirSpiderQPSIntegration(unittest.TestCase): + """AirSpider QPS集成测试""" + + def test_qps_scheduler_initialization(self): + """测试QPS调度器是否正确初始化""" + spider = QPSTestSpider() + self.assertIsNotNone(spider._qps_scheduler) + self.assertEqual(spider._qps_scheduler.max_prefetch, 50) + + def test_qps_disabled(self): + """测试QPS禁用时调度器不初始化""" + class NoQPSSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": False, + } + + def start_requests(self): + yield Request("https://example.com") + + def parse(self, request, response): + pass + + spider = NoQPSSpider() + self.assertIsNone(spider._qps_scheduler) + + +class TestQPSConfigInheriting(unittest.TestCase): + """测试QPS配置继承""" + + def test_custom_setting_override(self): + """测试__custom_setting__覆盖全局配置""" + # 保存原始设置 + original_enable = setting.DOMAIN_RATE_LIMIT_ENABLE + + # 测试自定义设置 + class CustomSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_DEFAULT": 100, + } + + def start_requests(self): + yield Request("https://example.com") + + def parse(self, request, response): + pass + + spider = CustomSpider() + # 验证爬虫级别设置生效 + self.assertTrue(setting.DOMAIN_RATE_LIMIT_ENABLE) + self.assertEqual(setting.DOMAIN_RATE_LIMIT_DEFAULT, 100) + + # 恢复原始设置 + setting.DOMAIN_RATE_LIMIT_ENABLE = original_enable + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/qps-scheduler/test_distributed_qps.py b/tests/qps-scheduler/test_distributed_qps.py new file mode 100644 index 0000000..97bb0ef --- /dev/null +++ b/tests/qps-scheduler/test_distributed_qps.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +""" +分布式QPS测试 - 多进程共享QPS配额 + +测试场景: +- 启动2个进程,模拟2个爬虫实例 +- 配置 baidu.com QPS=2(两个进程共享这个配额) +- 验证两个进程合计的QPS是否被控制在2 + +运行前请设置环境变量: + export REDIS_HOST=your_redis_host + export REDIS_PORT=6379 + export REDIS_PASSWORD=your_password + +Author: ShellMonster +Created: 2024 +""" + +import os +import time +import redis +import multiprocessing +from collections import defaultdict + +from feapder.core.schedulers import QPSScheduler +from feapder.utils.rate_limiter import DomainRateLimiter +from feapder.network.request import Request + + +# Redis配置(从环境变量读取) +REDIS_CONFIG = { + "host": os.getenv("REDIS_HOST", "localhost"), + "port": int(os.getenv("REDIS_PORT", 6379)), + "password": os.getenv("REDIS_PASSWORD", ""), + "decode_responses": True, +} + +# 测试配置 +TARGET_QPS = 2 # 目标QPS(两个进程共享) +NUM_REQUESTS_PER_PROCESS = 10 # 每个进程发送的请求数 +NUM_PROCESSES = 2 # 进程数 + + +def clear_redis_keys(): + """清理Redis中的测试数据""" + r = redis.Redis(**REDIS_CONFIG) + # 清理令牌桶相关的key + keys = r.keys("feapder:rate_limit:*") + if keys: + r.delete(*keys) + print(f"已清理 {len(keys)} 个Redis key") + + +def worker_process(process_id, result_queue): + """ + 工作进程:模拟一个爬虫实例 + + @param process_id: 进程ID + @param result_queue: 结果队列,用于收集请求时间 + """ + # 创建Redis客户端 + redis_client = redis.Redis(**REDIS_CONFIG) + + # 创建限流器(Redis模式) + rate_limiter = DomainRateLimiter( + rules={"www.baidu.com": TARGET_QPS}, + default_qps=0, + storage="redis", + redis_client=redis_client, + ) + + # 创建调度器 + scheduler = QPSScheduler(rate_limiter=rate_limiter, max_prefetch=50) + scheduler.start() + + # 记录请求时间 + request_times = [] + + # 提交请求 + for i in range(NUM_REQUESTS_PER_PROCESS): + scheduler.submit(Request(f"https://www.baidu.com/process{process_id}/page{i}")) + + # 获取就绪的请求 + completed = 0 + while completed < NUM_REQUESTS_PER_PROCESS: + request = scheduler.get_ready_request(timeout=1.0) + if request: + request_times.append(time.time()) + completed += 1 + print(f"[进程{process_id}] 获取请求 {completed}/{NUM_REQUESTS_PER_PROCESS}: {request.url}") + + scheduler.stop() + + # 将结果放入队列 + result_queue.put({ + "process_id": process_id, + "times": request_times, + }) + + print(f"[进程{process_id}] 完成,共处理 {len(request_times)} 个请求") + + +def run_distributed_test(): + """运行分布式QPS测试""" + print("=" * 70) + print("分布式QPS测试 - 多进程共享QPS配额") + print("=" * 70) + print(f"\n配置:") + print(f" - 进程数: {NUM_PROCESSES}") + print(f" - 每进程请求数: {NUM_REQUESTS_PER_PROCESS}") + print(f" - 总请求数: {NUM_PROCESSES * NUM_REQUESTS_PER_PROCESS}") + print(f" - 目标共享QPS: {TARGET_QPS}") + print(f" - 预期总耗时: {(NUM_PROCESSES * NUM_REQUESTS_PER_PROCESS - 1) / TARGET_QPS:.1f}秒") + + # 清理Redis + print("\n清理Redis...") + clear_redis_keys() + + # 创建结果队列 + result_queue = multiprocessing.Queue() + + # 启动多个进程 + print(f"\n启动 {NUM_PROCESSES} 个进程...") + start_time = time.time() + + processes = [] + for i in range(NUM_PROCESSES): + p = multiprocessing.Process(target=worker_process, args=(i, result_queue)) + p.start() + processes.append(p) + + # 等待所有进程完成 + for p in processes: + p.join(timeout=60) + + total_duration = time.time() - start_time + + # 收集结果 + all_times = [] + process_results = {} + + while not result_queue.empty(): + result = result_queue.get() + process_results[result["process_id"]] = result["times"] + all_times.extend(result["times"]) + + # 分析结果 + print("\n" + "=" * 70) + print("测试结果") + print("=" * 70) + + print(f"\n总耗时: {total_duration:.2f}秒") + + # 各进程统计 + for pid, times in sorted(process_results.items()): + times = sorted(times) + count = len(times) + if count >= 2: + time_span = times[-1] - times[0] + qps = (count - 1) / time_span if time_span > 0 else float('inf') + else: + time_span = 0 + qps = 0 + print(f"\n进程{pid}:") + print(f" 请求数: {count}") + print(f" 时间跨度: {time_span:.2f}秒") + print(f" 单进程QPS: {qps:.2f}") + + # 合并统计(关键指标) + all_times.sort() + total_count = len(all_times) + if total_count >= 2: + total_time_span = all_times[-1] - all_times[0] + actual_qps = (total_count - 1) / total_time_span if total_time_span > 0 else float('inf') + else: + total_time_span = 0 + actual_qps = 0 + + print("\n" + "-" * 50) + print("【合并统计 - 所有进程】") + print("-" * 50) + print(f" 总请求数: {total_count}") + print(f" 总时间跨度: {total_time_span:.2f}秒") + print(f" 实际共享QPS: {actual_qps:.2f}") + print(f" 配置QPS: {TARGET_QPS}") + + # 验证QPS + qps_error = abs(actual_qps - TARGET_QPS) / TARGET_QPS * 100 + print(f" 误差: {qps_error:.1f}%") + + if qps_error < 10: + print(f"\n✅ 分布式QPS共享控制精确!") + print(f" {NUM_PROCESSES}个进程共享 {TARGET_QPS} QPS 配额,实际 {actual_qps:.2f} QPS") + else: + print(f"\n⚠️ 分布式QPS控制有偏差") + + # 验证时间跨度 + expected_time = (total_count - 1) / TARGET_QPS + print(f"\n时间跨度验证:") + print(f" 预期: {expected_time:.1f}秒") + print(f" 实际: {total_time_span:.1f}秒") + + # 清理 + print("\n清理Redis...") + clear_redis_keys() + + +if __name__ == "__main__": + run_distributed_test() + print("\n" + "=" * 70) + print("分布式QPS测试完成!") + print("=" * 70) diff --git a/tests/qps-scheduler/test_feapder_integration.py b/tests/qps-scheduler/test_feapder_integration.py new file mode 100644 index 0000000..faf4d24 --- /dev/null +++ b/tests/qps-scheduler/test_feapder_integration.py @@ -0,0 +1,388 @@ +# -*- coding: utf-8 -*- +""" +基于 feapder AirSpider 的 QPS 限流集成测试 + +测试目标: +1. 验证 QPS 限流在真实 AirSpider 中的效果 +2. 对比 QPS 开启/关闭时的性能差异 +3. 验证 QPS 关闭时原有流程不受影响 + +Author: ShellMonster +Created: 2024 +""" + +import time +import threading +from collections import defaultdict + +import feapder +import feapder.setting as setting +from feapder import AirSpider, Request + + +# 使用本地 HTTP 服务模拟,避免真实网络请求 +# 这里用一个简单的 mock 响应来测试 + + +class MockResponse: + """模拟响应对象""" + def __init__(self, url): + self.url = url + self.status_code = 200 + self.text = f"Mock response for {url}" + self.content = self.text.encode() + + +def test_qps_enabled_spider(): + """测试1:QPS 开启时的 AirSpider""" + print("=" * 60) + print("测试1: QPS 开启时的 AirSpider 集成测试") + print("=" * 60) + + request_times = defaultdict(list) + lock = threading.Lock() + + class QPSEnabledSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_DEFAULT": 0, # 不在规则中的域名不限制 + "DOMAIN_RATE_LIMIT_RULES": { + "site-a.com": 3, # 3 QPS + "site-b.com": 5, # 5 QPS + }, + "DOMAIN_RATE_LIMIT_MAX_PREFETCH": 50, + "SPIDER_THREAD_COUNT": 4, + "LOG_LEVEL": "ERROR", # 减少日志输出 + } + + def start_requests(self): + # 每个域名生成10个请求 + for i in range(10): + yield Request(f"https://site-a.com/page{i}", auto_request=False) + for i in range(10): + yield Request(f"https://site-b.com/page{i}", auto_request=False) + + def parse(self, request, response): + domain = request.url.split("/")[2] + with lock: + request_times[domain].append(time.time()) + + def download_midware(self, request): + # Mock 下载,不发真实请求 + return MockResponse(request.url) + + spider = QPSEnabledSpider() + + # 验证调度器已初始化 + print(f"QPS调度器已初始化: {spider._qps_scheduler is not None}") + + start_time = time.time() + spider.run() + total_time = time.time() - start_time + + print(f"\n总耗时: {total_time:.2f}秒") + print(f"处理请求数: {sum(len(v) for v in request_times.values())}") + print("\n各域名实际QPS:") + print("-" * 40) + + results = {} + for domain in sorted(request_times.keys()): + times = sorted(request_times[domain]) + if len(times) >= 2: + time_span = times[-1] - times[0] + actual_qps = (len(times) - 1) / time_span if time_span > 0 else float('inf') + configured_qps = spider.__custom_setting__["DOMAIN_RATE_LIMIT_RULES"].get(domain, 0) + diff = abs(actual_qps - configured_qps) + status = "✅" if diff < 1.0 else "⚠️" + print(f" {status} {domain}: 配置={configured_qps} QPS, 实际={actual_qps:.2f} QPS") + results[domain] = {"configured": configured_qps, "actual": actual_qps} + + return total_time, results + + +def test_qps_disabled_spider(): + """测试2:QPS 关闭时的 AirSpider(验证原有流程不受影响)""" + print("\n" + "=" * 60) + print("测试2: QPS 关闭时的 AirSpider(验证性能无损)") + print("=" * 60) + + request_times = [] + lock = threading.Lock() + + class QPSDisabledSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": False, # 关闭 QPS + "SPIDER_THREAD_COUNT": 4, + "LOG_LEVEL": "ERROR", + } + + def start_requests(self): + # 生成20个请求 + for i in range(20): + yield Request(f"https://test.com/page{i}", auto_request=False) + + def parse(self, request, response): + with lock: + request_times.append(time.time()) + + def download_midware(self, request): + return MockResponse(request.url) + + spider = QPSDisabledSpider() + + # 验证调度器未初始化 + print(f"QPS调度器未初始化: {spider._qps_scheduler is None}") + + start_time = time.time() + spider.run() + total_time = time.time() - start_time + + print(f"\n总耗时: {total_time:.2f}秒") + print(f"处理请求数: {len(request_times)}") + + if len(request_times) >= 2: + time_span = request_times[-1] - request_times[0] + actual_qps = (len(request_times) - 1) / time_span if time_span > 0 else float('inf') + print(f"实际吞吐量: {actual_qps:.2f} 请求/秒(无限制)") + + return total_time, len(request_times) + + +def test_performance_comparison(): + """测试3:性能对比(QPS 开启 vs 关闭)""" + print("\n" + "=" * 60) + print("测试3: 性能对比(验证 QPS 关闭时无性能损失)") + print("=" * 60) + + # 相同请求数量,对比 QPS 开启/关闭的性能 + num_requests = 30 + request_times_enabled = [] + request_times_disabled = [] + lock = threading.Lock() + + # QPS 开启,但设置很高的限制(模拟不限制) + class HighQPSSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_DEFAULT": 1000, # 极高的 QPS,基本不限制 + "DOMAIN_RATE_LIMIT_RULES": {}, + "SPIDER_THREAD_COUNT": 4, + "LOG_LEVEL": "ERROR", + } + + def start_requests(self): + for i in range(num_requests): + yield Request(f"https://perf-test.com/page{i}", auto_request=False) + + def parse(self, request, response): + with lock: + request_times_enabled.append(time.time()) + + def download_midware(self, request): + return MockResponse(request.url) + + # QPS 关闭 + class NoQPSSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": False, + "SPIDER_THREAD_COUNT": 4, + "LOG_LEVEL": "ERROR", + } + + def start_requests(self): + for i in range(num_requests): + yield Request(f"https://perf-test.com/page{i}", auto_request=False) + + def parse(self, request, response): + with lock: + request_times_disabled.append(time.time()) + + def download_midware(self, request): + return MockResponse(request.url) + + # 运行 QPS 关闭的爬虫 + print("\n运行 QPS 关闭的爬虫...") + start1 = time.time() + NoQPSSpider().run() + time_disabled = time.time() - start1 + + # 运行 QPS 开启(高限制)的爬虫 + print("运行 QPS 开启(高限制)的爬虫...") + start2 = time.time() + HighQPSSpider().run() + time_enabled = time.time() - start2 + + print(f"\n性能对比结果:") + print(f" QPS 关闭: {time_disabled:.3f}秒 ({len(request_times_disabled)} 请求)") + print(f" QPS 开启: {time_enabled:.3f}秒 ({len(request_times_enabled)} 请求)") + + # 计算性能差异 + if time_disabled > 0: + overhead = ((time_enabled - time_disabled) / time_disabled) * 100 + print(f" 性能开销: {overhead:.1f}%") + + if overhead < 20: # 允许20%以内的开销 + print(" ✅ 性能开销在可接受范围内") + else: + print(" ⚠️ 性能开销较大,需要优化") + + return time_disabled, time_enabled + + +def test_qps_scheduler_flow(): + """测试4:验证 QPS 调度器的完整流程""" + print("\n" + "=" * 60) + print("测试4: 验证 QPS 调度器完整流程") + print("=" * 60) + + request_sequence = [] + lock = threading.Lock() + + class FlowTestSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_DEFAULT": 2, # 2 QPS + "DOMAIN_RATE_LIMIT_RULES": {}, + "DOMAIN_RATE_LIMIT_MAX_PREFETCH": 10, + "SPIDER_THREAD_COUNT": 2, + "LOG_LEVEL": "ERROR", + } + + def start_requests(self): + for i in range(6): + yield Request(f"https://flow-test.com/page{i}", auto_request=False) + + def parse(self, request, response): + with lock: + request_sequence.append({ + "url": request.url, + "time": time.time() + }) + + def download_midware(self, request): + return MockResponse(request.url) + + spider = FlowTestSpider() + + print(f"配置: 2 QPS, 6个请求, 2线程") + print(f"预期: 第1个立即执行,后续每0.5秒执行1个") + print(f"预期总时间: 约2.5秒((6-1)/2 = 2.5秒)") + + start_time = time.time() + spider.run() + total_time = time.time() - start_time + + print(f"\n实际总耗时: {total_time:.2f}秒") + print(f"处理请求数: {len(request_sequence)}") + + # 分析请求间隔 + if len(request_sequence) >= 2: + request_sequence.sort(key=lambda x: x["time"]) + intervals = [] + for i in range(1, len(request_sequence)): + interval = request_sequence[i]["time"] - request_sequence[i-1]["time"] + intervals.append(interval) + + avg_interval = sum(intervals) / len(intervals) + print(f"\n请求间隔分析:") + print(f" 平均间隔: {avg_interval:.3f}秒 (期望: ~0.5秒)") + + # 验证间隔是否合理 + if 0.4 <= avg_interval <= 0.6: + print(" ✅ 请求间隔符合预期") + else: + print(f" ⚠️ 请求间隔偏离预期") + + # 打印调度器统计 + if spider._qps_scheduler: + stats = spider._qps_scheduler.get_stats() + print(f"\n调度器统计:") + print(f" 提交请求数: {stats['submitted']}") + print(f" 立即执行数: {stats['immediate']}") + print(f" 延迟执行数: {stats['delayed']}") + print(f" 就绪请求数: {stats['ready']}") + + +def test_multi_domain_real_spider(): + """测试5:多域名真实场景测试""" + print("\n" + "=" * 60) + print("测试5: 多域名真实场景(百度、搜狗、搜狐、腾讯)") + print("=" * 60) + + domain_times = defaultdict(list) + lock = threading.Lock() + + class MultiDomainSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_DEFAULT": 0, + "DOMAIN_RATE_LIMIT_RULES": { + "www.baidu.com": 2, + "www.sogou.com": 3, + "www.sohu.com": 4, + "www.qq.com": 5, + }, + "DOMAIN_RATE_LIMIT_MAX_PREFETCH": 50, + "SPIDER_THREAD_COUNT": 32, # 多线程 + "LOG_LEVEL": "ERROR", + } + + def start_requests(self): + domains = ["www.baidu.com", "www.sogou.com", "www.sohu.com", "www.qq.com"] + for domain in domains: + for i in range(8): + yield Request(f"https://{domain}/page{i}", auto_request=False) + + def parse(self, request, response): + domain = request.url.split("/")[2] + with lock: + domain_times[domain].append(time.time()) + + def download_midware(self, request): + return MockResponse(request.url) + + spider = MultiDomainSpider() + rules = spider.__custom_setting__["DOMAIN_RATE_LIMIT_RULES"] + + print(f"配置: {rules}") + print(f"每域名8个请求,共32个请求,8线程") + + start_time = time.time() + spider.run() + total_time = time.time() - start_time + + print(f"\n总耗时: {total_time:.2f}秒") + print(f"总请求数: {sum(len(v) for v in domain_times.values())}") + print("\n各域名QPS验证:") + print("-" * 50) + + all_passed = True + for domain in sorted(domain_times.keys()): + times = sorted(domain_times[domain]) + if len(times) >= 2: + time_span = times[-1] - times[0] + actual_qps = (len(times) - 1) / time_span if time_span > 0 else 0 + configured_qps = rules.get(domain, 0) + diff = abs(actual_qps - configured_qps) + status = "✅" if diff < 0.5 else "⚠️" + if diff >= 0.5: + all_passed = False + print(f" {status} {domain}: 配置={configured_qps} QPS, 实际={actual_qps:.2f} QPS") + + if all_passed: + print("\n✅ 所有域名 QPS 控制精确") + else: + print("\n⚠️ 部分域名 QPS 控制存在偏差") + + +if __name__ == "__main__": + # 运行所有测试 + test_qps_enabled_spider() + test_qps_disabled_spider() + test_performance_comparison() + test_qps_scheduler_flow() + test_multi_domain_real_spider() + + print("\n" + "=" * 60) + print("所有 feapder 集成测试完成!") + print("=" * 60) diff --git a/tests/qps-scheduler/test_mixed_qps_comparison.py b/tests/qps-scheduler/test_mixed_qps_comparison.py new file mode 100644 index 0000000..65d5302 --- /dev/null +++ b/tests/qps-scheduler/test_mixed_qps_comparison.py @@ -0,0 +1,294 @@ +# -*- coding: utf-8 -*- +""" +混合QPS限制对比测试 + +测试①:QPS开启,部分域名限制、部分域名不限制 +测试②:QPS关闭,测试不限制的域名 + +对比: +- ①中限制域名的QPS是否被精确控制 +- ①中不限制域名 vs ②中同域名的性能差异 + +Author: ShellMonster +Created: 2024 +""" + +import time +import threading +from collections import defaultdict + +from feapder.core.schedulers import QPSScheduler +from feapder.utils.rate_limiter import DomainRateLimiter +from feapder.network.request import Request + + +# 模拟处理时间(毫秒) +PROCESS_TIME_MS = 10 + + +def run_mixed_qps_test(): + """ + 测试①:QPS开启,混合限制 + - baidu.com: 限制 2 QPS + - sogou.com: 不限制(default_qps=0) + """ + print("=" * 70) + print("测试①:QPS开启,混合限制(baidu=2QPS,sogou=不限制)") + print("=" * 70) + + num_requests_per_domain = 20 + num_threads = 32 + + # 创建限流器和调度器 + rate_limiter = DomainRateLimiter( + rules={"www.baidu.com": 2}, # baidu 限制 2 QPS + default_qps=0, # 默认不限制 + ) + scheduler = QPSScheduler(rate_limiter=rate_limiter, max_prefetch=100) + scheduler.start() + + # 记录每个域名的请求获取时间(从就绪队列获取的时间) + request_times = defaultdict(list) + lock = threading.Lock() + completed = {"count": 0} + + def consumer(): + """消费者线程:从调度器获取请求""" + while True: + request = scheduler.get_ready_request(timeout=0.5) + if request: + domain = "baidu" if "baidu" in request.url else "sogou" + with lock: + request_times[domain].append(time.time()) + completed["count"] += 1 + # 模拟处理时间 + time.sleep(PROCESS_TIME_MS / 1000) + elif completed["count"] >= num_requests_per_domain * 2: + break + + # 提交所有请求 + print(f"配置: {num_threads}线程, 每域名{num_requests_per_domain}请求") + print(f" - www.baidu.com: 限制 2 QPS") + print(f" - www.sogou.com: 不限制 (default=0)") + print(f" - 模拟处理时间: {PROCESS_TIME_MS}ms") + + start = time.time() + + # 交替提交两个域名的请求 + for i in range(num_requests_per_domain): + scheduler.submit(Request(f"https://www.baidu.com/page{i}")) + scheduler.submit(Request(f"https://www.sogou.com/page{i}")) + + # 启动消费者线程 + consumers = [] + for _ in range(num_threads): + t = threading.Thread(target=consumer) + t.start() + consumers.append(t) + + # 等待所有消费者完成 + for t in consumers: + t.join(timeout=30) + + duration = time.time() - start + scheduler.stop() + + print(f"\n总耗时: {duration:.2f}秒") + print(f"调度器统计: {scheduler.get_stats()}") + + results = {} + for domain in ["baidu", "sogou"]: + times = sorted(request_times[domain]) + count = len(times) + if count >= 2: + time_span = times[-1] - times[0] + actual_qps = (count - 1) / time_span if time_span > 0 else float('inf') + else: + time_span = 0 + actual_qps = 0 + + results[domain] = { + "count": count, + "time_span": time_span, + "actual_qps": actual_qps, + } + + print(f"\n{domain}.com:") + print(f" 处理请求: {count}") + print(f" 时间跨度: {time_span:.3f}秒") + print(f" 实际QPS: {actual_qps:.2f}") + + # 验证 baidu QPS 是否被控制 + baidu_qps = results["baidu"]["actual_qps"] + if 1.8 <= baidu_qps <= 2.2: + print(f"\n✅ baidu.com QPS控制精确 (配置2, 实际{baidu_qps:.2f})") + else: + print(f"\n⚠️ baidu.com QPS控制有偏差 (配置2, 实际{baidu_qps:.2f})") + + return results + + +def run_qps_disabled_test(target_domain="sogou"): + """ + 测试②:QPS关闭,直接从队列获取(模拟原始流程) + """ + print("\n" + "=" * 70) + print(f"测试②:QPS关闭,测试 {target_domain}.com(与测试①中不限制域名对比)") + print("=" * 70) + + from queue import Queue + + num_requests = 20 + num_threads = 32 + + # 使用普通队列模拟QPS关闭时的行为 + request_queue = Queue() + request_times = [] + lock = threading.Lock() + completed = {"count": 0} + + def consumer(): + """消费者线程""" + while True: + try: + request = request_queue.get(timeout=0.5) + with lock: + request_times.append(time.time()) + completed["count"] += 1 + # 模拟处理时间 + time.sleep(PROCESS_TIME_MS / 1000) + except: + if completed["count"] >= num_requests: + break + + print(f"配置: {num_threads}线程, {num_requests}请求, QPS关闭") + print(f" - 模拟处理时间: {PROCESS_TIME_MS}ms") + + start = time.time() + + # 提交所有请求 + for i in range(num_requests): + request_queue.put(Request(f"https://www.{target_domain}.com/page{i}")) + + # 启动消费者线程 + consumers = [] + for _ in range(num_threads): + t = threading.Thread(target=consumer) + t.start() + consumers.append(t) + + # 等待所有消费者完成 + for t in consumers: + t.join(timeout=30) + + duration = time.time() - start + + request_times.sort() + count = len(request_times) + if count >= 2: + time_span = request_times[-1] - request_times[0] + actual_qps = (count - 1) / time_span if time_span > 0 else float('inf') + else: + time_span = 0 + actual_qps = 0 + + print(f"\n总耗时: {duration:.2f}秒") + print(f"处理请求: {count}") + print(f"时间跨度: {time_span:.3f}秒") + print(f"实际QPS: {actual_qps:.2f}") + + return { + "count": count, + "time_span": time_span, + "actual_qps": actual_qps, + "duration": duration, + } + + +def run_comparison(): + """运行完整对比测试""" + print("\n" + "=" * 70) + print("域名级QPS混合限制对比测试") + print("=" * 70) + + # 测试①:QPS开启,混合限制 + results_mixed = run_mixed_qps_test() + + # 测试②:QPS关闭 + results_disabled = run_qps_disabled_test("sogou") + + # 对比分析 + print("\n" + "=" * 70) + print("对比分析结果") + print("=" * 70) + + print("\n【1】QPS限制精度验证(baidu.com, 配置2QPS)") + print("-" * 50) + baidu_qps = results_mixed["baidu"]["actual_qps"] + print(f" 配置QPS: 2") + print(f" 实际QPS: {baidu_qps:.2f}") + qps_error = abs(baidu_qps - 2) / 2 * 100 if baidu_qps != float('inf') else 100 + print(f" 误差: {qps_error:.1f}%") + if qps_error < 10: + print(f" ✅ QPS限制精确控制") + else: + print(f" ⚠️ QPS限制有偏差") + + print("\n【2】不限制域名性能对比(sogou.com)") + print("-" * 50) + sogou_qps_enabled = results_mixed["sogou"]["actual_qps"] + sogou_qps_disabled = results_disabled["actual_qps"] + sogou_time_enabled = results_mixed["sogou"]["time_span"] + sogou_time_disabled = results_disabled["time_span"] + + print(f" QPS开启(不限制域名):") + print(f" 时间跨度: {sogou_time_enabled:.3f}秒") + print(f" 实际QPS: {sogou_qps_enabled:.2f}") + print(f" QPS关闭:") + print(f" 时间跨度: {sogou_time_disabled:.3f}秒") + print(f" 实际QPS: {sogou_qps_disabled:.2f}") + + # 用时间跨度来对比性能更准确 + if sogou_time_disabled > 0 and sogou_time_enabled > 0: + # 时间跨度差异 + time_diff_pct = ((sogou_time_enabled - sogou_time_disabled) / sogou_time_disabled) * 100 + print(f"\n 时间跨度差异: {time_diff_pct:.1f}% {'(QPS架构更慢)' if time_diff_pct > 0 else '(QPS架构更快)'}") + + if abs(time_diff_pct) < 10: + print(f" ✅ QPS架构对不限制域名性能影响很小(<10%)") + elif abs(time_diff_pct) < 20: + print(f" ⚠️ QPS架构对不限制域名有一定性能影响(10-20%)") + else: + print(f" ❌ QPS架构对不限制域名性能影响较大(>{abs(time_diff_pct):.0f}%)") + elif sogou_time_enabled == 0 and sogou_time_disabled == 0: + print(f"\n ✅ 两种模式时间跨度都接近0,说明请求几乎瞬间被处理(不限制情况下)") + + print("\n【3】总结") + print("-" * 50) + print(f" • 有限制域名(baidu):") + print(f" - 配置QPS: 2") + print(f" - 实际QPS: {baidu_qps:.2f}") + print(f" - 时间跨度: {results_mixed['baidu']['time_span']:.2f}秒") + print(f" • 无限制域名(sogou): ") + print(f" - QPS开启: 时间跨度 {sogou_time_enabled:.3f}秒") + print(f" - QPS关闭: 时间跨度 {sogou_time_disabled:.3f}秒") + + # 额外验证:baidu的时间跨度应该约为 (20-1)/2 = 9.5秒 + if results_mixed["baidu"]["count"] > 1: + expected_baidu_time = (results_mixed["baidu"]["count"] - 1) / 2 + actual_baidu_time = results_mixed["baidu"]["time_span"] + print(f"\n • baidu时间跨度验证:") + print(f" - 预期: {expected_baidu_time:.1f}秒 ({results_mixed['baidu']['count']-1}请求 / 2QPS)") + print(f" - 实际: {actual_baidu_time:.1f}秒") + time_error = abs(actual_baidu_time - expected_baidu_time) / expected_baidu_time * 100 + if time_error < 5: + print(f" - ✅ 误差 {time_error:.1f}%") + else: + print(f" - ⚠️ 误差 {time_error:.1f}%") + + +if __name__ == "__main__": + run_comparison() + print("\n" + "=" * 70) + print("混合QPS对比测试完成!") + print("=" * 70) diff --git a/tests/qps-scheduler/test_multi_domain_qps.py b/tests/qps-scheduler/test_multi_domain_qps.py new file mode 100644 index 0000000..4737545 --- /dev/null +++ b/tests/qps-scheduler/test_multi_domain_qps.py @@ -0,0 +1,430 @@ +# -*- coding: utf-8 -*- +""" +多域名、多线程QPS限流测试 + +测试目标: +1. 验证各域名的实际QPS是否符合配置 +2. 验证不会出现"刚放进去又拿出来"的情况 +3. 多线程并发场景下的正确性 + +Author: ShellMonster +Created: 2024 +""" + +import time +import threading +from collections import defaultdict +from feapder.utils.rate_limiter import DomainRateLimiter +from feapder.core.schedulers import QPSScheduler + + +class MockRequest: + """模拟请求对象""" + def __init__(self, url, submit_time=None): + self.url = url + self.submit_time = submit_time or time.time() # 记录提交时间 + + +def test_multi_domain_qps(): + """测试多域名QPS控制精度""" + print("=" * 60) + print("测试1: 多域名QPS控制精度测试") + print("=" * 60) + + # 配置不同域名的QPS + rules = { + "www.baidu.com": 2, # 百度 2 QPS + "www.sogou.com": 3, # 搜狗 3 QPS + "www.sohu.com": 4, # 搜狐 4 QPS + "www.qq.com": 5, # 腾讯 5 QPS + "*.taobao.com": 2, # 淘宝通配符 2 QPS + } + + rate_limiter = DomainRateLimiter(rules=rules, default_qps=10) + scheduler = QPSScheduler(rate_limiter, max_prefetch=100) + scheduler.start() + + # 统计每个域名的请求完成时间 + domain_request_times = defaultdict(list) + lock = threading.Lock() + + # 每个域名生成的请求数 + requests_per_domain = 10 + + domains = [ + "www.baidu.com", + "www.sogou.com", + "www.sohu.com", + "www.qq.com", + "item.taobao.com", # 测试通配符 + "detail.taobao.com", # 测试通配符 + ] + + # 生产者:提交请求 + def submit_requests(): + for domain in domains: + for i in range(requests_per_domain): + url = f"https://{domain}/page{i}" + request = MockRequest(url) + scheduler.submit(request) + print(f"已提交 {len(domains) * requests_per_domain} 个请求") + + # 消费者:获取并执行请求 + total_requests = len(domains) * requests_per_domain + consumed = [0] + + def consume_requests(): + while consumed[0] < total_requests: + request = scheduler.get_ready_request(timeout=0.5) + if request: + ready_time = time.time() + domain = DomainRateLimiter.extract_domain(request.url) + with lock: + domain_request_times[domain].append(ready_time) + consumed[0] += 1 + + # 启动消费者线程(模拟多线程爬虫) + num_consumers = 32 + consumer_threads = [] + for i in range(num_consumers): + t = threading.Thread(target=consume_requests, name=f"Consumer-{i}") + consumer_threads.append(t) + t.start() + + # 提交请求 + start_time = time.time() + submit_requests() + + # 等待所有消费者完成 + for t in consumer_threads: + t.join(timeout=30) + + total_time = time.time() - start_time + scheduler.stop() + + print(f"\n总耗时: {total_time:.2f}秒") + print(f"处理请求数: {consumed[0]}") + print("\n各域名实际QPS分析:") + print("-" * 60) + + for domain in sorted(domain_request_times.keys()): + times = sorted(domain_request_times[domain]) + if len(times) >= 2: + # 计算实际QPS(请求数 / 时间跨度) + time_span = times[-1] - times[0] + if time_span > 0: + actual_qps = (len(times) - 1) / time_span + else: + actual_qps = float('inf') + + # 计算平均间隔 + intervals = [times[i+1] - times[i] for i in range(len(times)-1)] + avg_interval = sum(intervals) / len(intervals) if intervals else 0 + + configured_qps = rate_limiter.get_qps_limit(domain) + expected_interval = 1.0 / configured_qps if configured_qps > 0 else 0 + + print(f"{domain}:") + print(f" 配置QPS: {configured_qps}, 实际QPS: {actual_qps:.2f}") + print(f" 期望间隔: {expected_interval:.3f}s, 实际平均间隔: {avg_interval:.3f}s") + print(f" 请求数: {len(times)}") + else: + print(f"{domain}: 请求数不足,无法计算QPS") + + print() + + +def test_no_immediate_return(): + """测试不会出现'刚放进去又拿出来'的问题""" + print("=" * 60) + print("测试2: 验证不会'刚放进去又拿出来'") + print("=" * 60) + + rate_limiter = DomainRateLimiter( + rules={"test.com": 2}, # 2 QPS,即每0.5秒一个 + default_qps=0 + ) + scheduler = QPSScheduler(rate_limiter, max_prefetch=100) + scheduler.start() + + # 先消耗掉初始令牌 + for i in range(2): + req = MockRequest(f"https://test.com/init{i}") + scheduler.submit(req) + + time.sleep(0.1) # 等待调度器处理 + + # 消费掉初始的2个 + scheduler.get_ready_request(timeout=0.5) + scheduler.get_ready_request(timeout=0.5) + + print("初始令牌已消耗完毕,现在测试新请求的延迟...") + + # 记录提交和获取时间 + results = [] + + for i in range(5): + submit_time = time.time() + req = MockRequest(f"https://test.com/page{i}", submit_time) + scheduler.submit(req) + + # 立即尝试获取 + ready_req = scheduler.get_ready_request(timeout=2.0) + ready_time = time.time() + + if ready_req: + delay = ready_time - submit_time + results.append({ + 'index': i, + 'submit_time': submit_time, + 'ready_time': ready_time, + 'delay': delay + }) + print(f" 请求{i}: 提交后 {delay:.3f}s 后获取到") + + scheduler.stop() + + # 验证:除了第一个可能立即获取,后续的应该都有延迟 + immediate_count = sum(1 for r in results if r['delay'] < 0.1) + delayed_count = sum(1 for r in results if r['delay'] >= 0.1) + + print(f"\n立即获取: {immediate_count}个, 延迟获取: {delayed_count}个") + + if delayed_count >= 4: + print("✅ 测试通过:请求正确地被延迟处理") + else: + print("⚠️ 警告:可能存在'刚放进去又拿出来'的问题") + + print() + + +def test_concurrent_producers_consumers(): + """测试多生产者多消费者场景""" + print("=" * 60) + print("测试3: 多生产者多消费者并发测试") + print("=" * 60) + + rules = { + "www.baidu.com": 5, + "www.sogou.com": 5, + "www.sohu.com": 5, + "www.qq.com": 5, + } + + rate_limiter = DomainRateLimiter(rules=rules, default_qps=0) + scheduler = QPSScheduler(rate_limiter, max_prefetch=50) + scheduler.start() + + domains = list(rules.keys()) + requests_per_producer = 20 + num_producers = 8 + num_consumers = 32 + + submitted = [0] + consumed = [0] + submit_lock = threading.Lock() + consume_lock = threading.Lock() + + # 记录每个请求的提交时间和完成时间 + request_records = {} + records_lock = threading.Lock() + + total_requests = num_producers * requests_per_producer + + def producer(producer_id): + for i in range(requests_per_producer): + domain = domains[i % len(domains)] + url = f"https://{domain}/p{producer_id}_r{i}" + req = MockRequest(url) + + with records_lock: + request_records[url] = {'submit': time.time(), 'ready': None} + + scheduler.submit(req) + + with submit_lock: + submitted[0] += 1 + + def consumer(consumer_id): + while True: + with consume_lock: + if consumed[0] >= total_requests: + break + + req = scheduler.get_ready_request(timeout=0.5) + if req: + ready_time = time.time() + with records_lock: + if req.url in request_records: + request_records[req.url]['ready'] = ready_time + + with consume_lock: + consumed[0] += 1 + + start_time = time.time() + + # 启动生产者 + producer_threads = [] + for i in range(num_producers): + t = threading.Thread(target=producer, args=(i,)) + producer_threads.append(t) + t.start() + + # 启动消费者 + consumer_threads = [] + for i in range(num_consumers): + t = threading.Thread(target=consumer, args=(i,)) + consumer_threads.append(t) + t.start() + + # 等待完成 + for t in producer_threads: + t.join() + for t in consumer_threads: + t.join(timeout=30) + + total_time = time.time() - start_time + scheduler.stop() + + print(f"生产者数: {num_producers}, 消费者数: {num_consumers}") + print(f"总请求数: {total_requests}") + print(f"已提交: {submitted[0]}, 已消费: {consumed[0]}") + print(f"总耗时: {total_time:.2f}秒") + + # 分析延迟 + delays = [] + for url, record in request_records.items(): + if record['ready']: + delay = record['ready'] - record['submit'] + delays.append(delay) + + if delays: + avg_delay = sum(delays) / len(delays) + max_delay = max(delays) + min_delay = min(delays) + print(f"\n延迟统计:") + print(f" 平均延迟: {avg_delay:.3f}s") + print(f" 最小延迟: {min_delay:.3f}s") + print(f" 最大延迟: {max_delay:.3f}s") + + # 按域名统计 + print(f"\n各域名QPS验证:") + domain_times = defaultdict(list) + for url, record in request_records.items(): + if record['ready']: + domain = DomainRateLimiter.extract_domain(url) + domain_times[domain].append(record['ready']) + + for domain in sorted(domain_times.keys()): + times = sorted(domain_times[domain]) + if len(times) >= 2: + time_span = times[-1] - times[0] + actual_qps = (len(times) - 1) / time_span if time_span > 0 else 0 + configured = rules.get(domain, 0) + diff = abs(actual_qps - configured) + status = "✅" if diff < 1 else "⚠️" + print(f" {status} {domain}: 配置={configured} QPS, 实际={actual_qps:.2f} QPS") + + print() + + +def test_strict_qps_measurement(): + """精确测量QPS""" + print("=" * 60) + print("测试4: 精确QPS测量(单域名)") + print("=" * 60) + + target_qps = 5 + test_duration = 3 # 测试3秒 + + rate_limiter = DomainRateLimiter( + rules={"measure.com": target_qps}, + default_qps=0 + ) + scheduler = QPSScheduler(rate_limiter, max_prefetch=100) + scheduler.start() + + request_times = [] + lock = threading.Lock() + stop_flag = [False] + + # 持续提交请求 + def producer(): + i = 0 + while not stop_flag[0]: + req = MockRequest(f"https://measure.com/page{i}") + scheduler.submit(req, block=False) + i += 1 + time.sleep(0.01) # 避免提交太快 + + # 持续消费并记录时间 + def consumer(): + while not stop_flag[0]: + req = scheduler.get_ready_request(timeout=0.1) + if req: + with lock: + request_times.append(time.time()) + + # 启动线程 + num_consumers = 32 + producer_thread = threading.Thread(target=producer) + consumer_threads = [threading.Thread(target=consumer) for _ in range(num_consumers)] + + producer_thread.start() + for t in consumer_threads: + t.start() + + # 运行指定时间 + time.sleep(test_duration) + stop_flag[0] = True + + producer_thread.join() + for t in consumer_threads: + t.join() + + scheduler.stop() + + # 分析结果 + if len(request_times) >= 2: + request_times.sort() + time_span = request_times[-1] - request_times[0] + actual_qps = (len(request_times) - 1) / time_span if time_span > 0 else 0 + + # 计算每秒的请求数 + second_counts = defaultdict(int) + base_time = int(request_times[0]) + for t in request_times: + second = int(t) - base_time + second_counts[second] += 1 + + print(f"配置QPS: {target_qps}") + print(f"测试时长: {time_span:.2f}秒") + print(f"总请求数: {len(request_times)}") + print(f"实际QPS: {actual_qps:.2f}") + print(f"\n每秒请求数分布:") + for sec in sorted(second_counts.keys()): + count = second_counts[sec] + bar = "█" * count + status = "✅" if abs(count - target_qps) <= 1 else "⚠️" + print(f" 第{sec}秒: {count} 个 {status} {bar}") + + # 验证 + deviation = abs(actual_qps - target_qps) / target_qps * 100 + print(f"\nQPS偏差: {deviation:.1f}%") + if deviation < 10: + print("✅ QPS控制精度良好") + else: + print("⚠️ QPS控制存在较大偏差") + + print() + + +if __name__ == "__main__": + test_multi_domain_qps() + test_no_immediate_return() + test_concurrent_producers_consumers() + test_strict_qps_measurement() + + print("=" * 60) + print("所有测试完成!") + print("=" * 60) diff --git a/tests/qps-scheduler/test_performance_only.py b/tests/qps-scheduler/test_performance_only.py new file mode 100644 index 0000000..9ff4c4a --- /dev/null +++ b/tests/qps-scheduler/test_performance_only.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +""" +QPS 开启/关闭 性能对比测试 + +专门测试 QPS 关闭时是否有性能损失 + +Author: ShellMonster +Created: 2024 +""" + +import time +import threading + +import feapder.setting as setting +from feapder import AirSpider, Request + + +class MockResponse: + """模拟响应对象""" + def __init__(self, url): + self.url = url + self.status_code = 200 + self.text = f"Mock response for {url}" + self.content = self.text.encode() + + +def run_performance_test(): + """性能对比测试""" + print("=" * 70) + print("性能对比测试:QPS 关闭 vs QPS 开启(高限制)") + print("=" * 70) + + num_requests = 200 # 更多请求以减少启动/退出开销的影响 + num_threads = 32 + + # ==================== 测试1:QPS 关闭 ==================== + print(f"\n[测试1] QPS 关闭,{num_requests}个请求,{num_threads}线程") + + times_disabled = [] + lock1 = threading.Lock() + + class DisabledSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": False, + "SPIDER_THREAD_COUNT": num_threads, + "LOG_LEVEL": "ERROR", + } + + def start_requests(self): + for i in range(num_requests): + yield Request(f"https://test.com/page{i}", auto_request=False) + + def parse(self, request, response): + with lock1: + times_disabled.append(time.time()) + + def download_midware(self, request): + return MockResponse(request.url) + + spider1 = DisabledSpider() + print(f" _qps_scheduler: {spider1._qps_scheduler}") + + start1 = time.time() + spider1.run() + duration1 = time.time() - start1 + + if len(times_disabled) >= 2: + work_time1 = times_disabled[-1] - times_disabled[0] + throughput1 = (len(times_disabled) - 1) / work_time1 if work_time1 > 0 else 0 + else: + work_time1 = 0 + throughput1 = 0 + + print(f" 总耗时: {duration1:.3f}秒") + print(f" 实际工作时间: {work_time1:.3f}秒") + print(f" 处理请求: {len(times_disabled)}") + print(f" 吞吐量: {throughput1:.1f} 请求/秒") + + # ==================== 测试2:QPS 开启(极高限制,相当于不限制) ==================== + print(f"\n[测试2] QPS 开启(default_qps=10000),{num_requests}个请求,{num_threads}线程") + + times_enabled = [] + lock2 = threading.Lock() + + class EnabledHighSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_DEFAULT": 10000, # 极高限制 + "DOMAIN_RATE_LIMIT_RULES": {}, + "DOMAIN_RATE_LIMIT_MAX_PREFETCH": 200, + "SPIDER_THREAD_COUNT": num_threads, + "LOG_LEVEL": "ERROR", + } + + def start_requests(self): + for i in range(num_requests): + yield Request(f"https://test.com/page{i}", auto_request=False) + + def parse(self, request, response): + with lock2: + times_enabled.append(time.time()) + + def download_midware(self, request): + return MockResponse(request.url) + + spider2 = EnabledHighSpider() + print(f" _qps_scheduler: {spider2._qps_scheduler}") + + start2 = time.time() + spider2.run() + duration2 = time.time() - start2 + + if len(times_enabled) >= 2: + work_time2 = times_enabled[-1] - times_enabled[0] + throughput2 = (len(times_enabled) - 1) / work_time2 if work_time2 > 0 else 0 + else: + work_time2 = 0 + throughput2 = 0 + + print(f" 总耗时: {duration2:.3f}秒") + print(f" 实际工作时间: {work_time2:.3f}秒") + print(f" 处理请求: {len(times_enabled)}") + print(f" 吞吐量: {throughput2:.1f} 请求/秒") + + # ==================== 结果对比 ==================== + print("\n" + "=" * 70) + print("性能对比结果") + print("=" * 70) + + print(f"\n{'指标':<20} {'QPS关闭':<15} {'QPS开启':<15} {'差异':<15}") + print("-" * 70) + print(f"{'总耗时':<20} {duration1:.3f}秒{'':<8} {duration2:.3f}秒{'':<8} {(duration2-duration1):.3f}秒") + print(f"{'实际工作时间':<16} {work_time1:.3f}秒{'':<8} {work_time2:.3f}秒{'':<8} {(work_time2-work_time1):.3f}秒") + print(f"{'吞吐量':<20} {throughput1:.1f}/秒{'':<7} {throughput2:.1f}/秒{'':<7}") + + # 关键指标:实际工作时间的差异 + if work_time1 > 0: + overhead_pct = ((work_time2 - work_time1) / work_time1) * 100 + print(f"\n实际工作时间开销: {overhead_pct:.1f}%") + + if abs(overhead_pct) < 15: + print("✅ QPS 架构对性能影响可忽略(<15%)") + elif abs(overhead_pct) < 30: + print("⚠️ QPS 架构有轻微性能影响(15-30%)") + else: + print("❌ QPS 架构性能影响较大(>30%)") + + # 总耗时差异(包含启动/退出开销) + if duration1 > 0: + total_overhead_pct = ((duration2 - duration1) / duration1) * 100 + print(f"总耗时开销: {total_overhead_pct:.1f}%(包含启动/退出开销)") + + +if __name__ == "__main__": + run_performance_test() diff --git a/tests/qps-scheduler/test_qps_scheduler.py b/tests/qps-scheduler/test_qps_scheduler.py new file mode 100644 index 0000000..78eb5e9 --- /dev/null +++ b/tests/qps-scheduler/test_qps_scheduler.py @@ -0,0 +1,314 @@ +# -*- coding: utf-8 -*- +""" +QPS调度器单元测试 + +测试 QPSScheduler 的功能正确性。 + +Author: ShellMonster +Created: 2024 +""" + +import time +import threading +import unittest +from unittest.mock import MagicMock + +from feapder.utils.rate_limiter import DomainRateLimiter +from feapder.core.schedulers.qps_scheduler import QPSScheduler, DelayedRequest + + +class MockRequest: + """模拟请求对象""" + def __init__(self, url): + self.url = url + + +class TestDelayedRequest(unittest.TestCase): + """延迟请求包装类测试""" + + def test_ordering(self): + """测试按时间排序""" + req1 = DelayedRequest(1.0, "request1", "domain1") + req2 = DelayedRequest(2.0, "request2", "domain2") + req3 = DelayedRequest(0.5, "request3", "domain3") + + # 按scheduled_time排序 + sorted_reqs = sorted([req1, req2, req3]) + self.assertEqual(sorted_reqs[0].scheduled_time, 0.5) + self.assertEqual(sorted_reqs[1].scheduled_time, 1.0) + self.assertEqual(sorted_reqs[2].scheduled_time, 2.0) + + +class TestQPSScheduler(unittest.TestCase): + """QPS调度器测试""" + + def setUp(self): + """测试前准备""" + self.rate_limiter = DomainRateLimiter( + rules={"test.com": 5}, # 5 QPS + default_qps=10 + ) + + def tearDown(self): + """测试后清理""" + pass + + def test_init(self): + """测试初始化""" + scheduler = QPSScheduler(self.rate_limiter, max_prefetch=50) + self.assertEqual(scheduler.max_prefetch, 50) + self.assertFalse(scheduler._running) + + def test_start_stop(self): + """测试启动和停止""" + scheduler = QPSScheduler(self.rate_limiter) + scheduler.start() + self.assertTrue(scheduler._running) + time.sleep(0.1) # 让调度线程启动 + + scheduler.stop() + self.assertFalse(scheduler._running) + + def test_submit_immediate(self): + """测试立即可执行的请求提交""" + scheduler = QPSScheduler(self.rate_limiter) + scheduler.start() + + # 提交一个请求 + request = MockRequest("https://test.com/page1") + result = scheduler.submit(request) + self.assertTrue(result) + + # 等待调度器处理 + time.sleep(0.1) + + # 应该能立即从就绪队列获取 + ready = scheduler.get_ready_request_nowait() + self.assertIsNotNone(ready) + self.assertEqual(ready.url, "https://test.com/page1") + + scheduler.stop() + + def test_submit_delayed(self): + """测试需要延迟的请求""" + # 使用低QPS + rate_limiter = DomainRateLimiter( + rules={"delay.com": 2}, # 2 QPS + default_qps=0 + ) + scheduler = QPSScheduler(rate_limiter) + scheduler.start() + + # 提交多个请求超过QPS限制 + for i in range(4): + request = MockRequest(f"https://delay.com/page{i}") + scheduler.submit(request) + + # 等待调度器处理 + time.sleep(0.1) + + # 初始只有1个令牌,只有1个应该立即可用 + ready1 = scheduler.get_ready_request_nowait() + self.assertIsNotNone(ready1) + + # 第2个应该还在延迟堆中 + ready2 = scheduler.get_ready_request_nowait() + self.assertIsNone(ready2) + + # 等待令牌恢复后应该能获取更多 + time.sleep(0.6) # 等待超过0.5秒(2 QPS = 每0.5秒1个) + ready2 = scheduler.get_ready_request_nowait() + self.assertIsNotNone(ready2) + + scheduler.stop() + + def test_max_prefetch_backpressure(self): + """测试max_prefetch背压机制""" + rate_limiter = DomainRateLimiter( + rules={"bp.com": 1}, # 1 QPS + default_qps=0 + ) + scheduler = QPSScheduler(rate_limiter, max_prefetch=3) + scheduler.start() + + # 提交3个请求应该成功 + for i in range(3): + request = MockRequest(f"https://bp.com/page{i}") + result = scheduler.submit(request, block=False) + self.assertTrue(result) + + # 等待调度器处理 + time.sleep(0.1) + + # 第4个请求应该被阻塞(非阻塞模式下返回False) + request4 = MockRequest("https://bp.com/page4") + result = scheduler.submit(request4, block=False) + self.assertFalse(result) + + # 消费一个请求后应该能提交 + scheduler.get_ready_request(timeout=0.1) + result = scheduler.submit(request4, block=False) + self.assertTrue(result) + + scheduler.stop() + + def test_is_empty(self): + """测试空检测""" + scheduler = QPSScheduler(self.rate_limiter) + scheduler.start() + + # 初始为空 + self.assertTrue(scheduler.is_empty()) + + # 提交请求后不为空 + scheduler.submit(MockRequest("https://test.com/page")) + time.sleep(0.1) + self.assertFalse(scheduler.is_empty()) + + # 消费后为空 + scheduler.get_ready_request(timeout=0.5) + time.sleep(0.1) + self.assertTrue(scheduler.is_empty()) + + scheduler.stop() + + def test_stats(self): + """测试统计信息""" + scheduler = QPSScheduler(self.rate_limiter) + scheduler.start() + + # 提交几个请求 + for i in range(3): + scheduler.submit(MockRequest(f"https://test.com/page{i}")) + + time.sleep(0.1) + + stats = scheduler.get_stats() + self.assertEqual(stats['submitted'], 3) + self.assertGreaterEqual(stats['immediate'] + stats['delayed'], 3) + + scheduler.stop() + + def test_multiple_domains(self): + """测试多域名支持""" + rate_limiter = DomainRateLimiter( + rules={ + "domain1.com": 2, + "domain2.com": 3 + }, + default_qps=5 + ) + scheduler = QPSScheduler(rate_limiter) + scheduler.start() + + # 提交不同域名的请求 + requests = [ + MockRequest("https://domain1.com/1"), + MockRequest("https://domain1.com/2"), + MockRequest("https://domain2.com/1"), + MockRequest("https://domain2.com/2"), + MockRequest("https://domain2.com/3"), + ] + + for req in requests: + scheduler.submit(req) + + time.sleep(0.2) + + # 每个域名初始有1个令牌,所以立即可用的是2个(domain1.com和domain2.com各1个) + ready_count = 0 + while True: + ready = scheduler.get_ready_request_nowait() + if ready is None: + break + ready_count += 1 + + self.assertEqual(ready_count, 2) + + # 等待更长时间让所有请求就绪 + time.sleep(1.5) + while True: + ready = scheduler.get_ready_request_nowait() + if ready is None: + break + ready_count += 1 + + # 所有5个请求都应该被处理了 + self.assertEqual(ready_count, 5) + + scheduler.stop() + + +class TestQPSSchedulerConcurrency(unittest.TestCase): + """并发测试""" + + def test_concurrent_submit(self): + """测试并发提交""" + rate_limiter = DomainRateLimiter( + rules={"concurrent.com": 100}, + default_qps=0 + ) + scheduler = QPSScheduler(rate_limiter) + scheduler.start() + + submit_count = [0] + lock = threading.Lock() + + def submit_requests(): + for i in range(20): + request = MockRequest(f"https://concurrent.com/{threading.current_thread().name}/{i}") + if scheduler.submit(request): + with lock: + submit_count[0] += 1 + + threads = [threading.Thread(target=submit_requests) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join() + + # 等待处理完成 + time.sleep(0.5) + + self.assertEqual(submit_count[0], 100) + + scheduler.stop() + + def test_concurrent_consume(self): + """测试并发消费""" + rate_limiter = DomainRateLimiter( + rules={"consume.com": 100}, + default_qps=0 + ) + scheduler = QPSScheduler(rate_limiter) + scheduler.start() + + # 先提交100个请求 + for i in range(100): + scheduler.submit(MockRequest(f"https://consume.com/page{i}")) + + time.sleep(0.2) + + consumed = [] + lock = threading.Lock() + + def consume_requests(): + for _ in range(25): + request = scheduler.get_ready_request(timeout=1.0) + if request: + with lock: + consumed.append(request) + + threads = [threading.Thread(target=consume_requests) for _ in range(4)] + for t in threads: + t.start() + for t in threads: + t.join() + + self.assertEqual(len(consumed), 100) + + scheduler.stop() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/qps-scheduler/test_rate_limiter.py b/tests/qps-scheduler/test_rate_limiter.py new file mode 100644 index 0000000..68c99dd --- /dev/null +++ b/tests/qps-scheduler/test_rate_limiter.py @@ -0,0 +1,219 @@ +# -*- coding: utf-8 -*- +""" +域名级QPS限流模块单元测试 + +测试 LocalTokenBucket、DomainRateLimiter 的功能正确性。 + +Author: ShellMonster +Created: 2024 +""" + +import time +import threading +import unittest + +from feapder.utils.rate_limiter import ( + LocalTokenBucket, + DomainRateLimiter, +) + + +class TestLocalTokenBucket(unittest.TestCase): + """本地令牌桶测试""" + + def test_init_with_valid_qps(self): + """测试正常初始化""" + bucket = LocalTokenBucket(qps=10) + self.assertEqual(bucket.qps, 10) + self.assertEqual(bucket.capacity, 10) + # 初始令牌为1(避免突发) + self.assertEqual(bucket.tokens, 1.0) + + def test_init_with_invalid_qps(self): + """测试无效QPS值""" + with self.assertRaises(ValueError): + LocalTokenBucket(qps=0) + with self.assertRaises(ValueError): + LocalTokenBucket(qps=-1) + + def test_acquire_immediate(self): + """测试立即获取令牌""" + bucket = LocalTokenBucket(qps=10) + # 初始有1个令牌,第1次应该立即返回 + wait_time = bucket.acquire() + self.assertEqual(wait_time, 0) + # 第2次应该需要等待 + wait_time = bucket.acquire() + self.assertGreater(wait_time, 0) + + def test_acquire_with_wait(self): + """测试需要等待的情况""" + bucket = LocalTokenBucket(qps=2) # 每秒2个令牌 + # 消耗初始令牌 + bucket.acquire() + # 第2次应该需要等待约0.5秒(1/2 QPS) + wait_time = bucket.acquire() + self.assertGreater(wait_time, 0) + self.assertLessEqual(wait_time, 0.6) # 约0.5秒 + + def test_token_replenish(self): + """测试令牌补充""" + bucket = LocalTokenBucket(qps=10) + # 消耗初始令牌并预扣一个 + bucket.acquire() # 消耗初始的1个令牌 + bucket.acquire() # 预扣,返回等待时间 + + # 等待一段时间让令牌补充 + time.sleep(0.2) # 0.2秒应该补充2个令牌 + + # 应该能立即获取 + wait_time = bucket.acquire() + self.assertEqual(wait_time, 0) + + def test_thread_safety(self): + """测试线程安全性""" + bucket = LocalTokenBucket(qps=100) + results = [] + + def acquire_tokens(): + for _ in range(10): + wait_time = bucket.acquire() + results.append(wait_time) + + threads = [threading.Thread(target=acquire_tokens) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join() + + # 应该有50个结果 + self.assertEqual(len(results), 50) + + +class TestDomainRateLimiter(unittest.TestCase): + """域名级限流器测试""" + + def test_exact_domain_match(self): + """测试精确域名匹配""" + limiter = DomainRateLimiter( + rules={"www.baidu.com": 5, "www.google.com": 10}, + default_qps=20 + ) + self.assertEqual(limiter.get_qps_limit("www.baidu.com"), 5) + self.assertEqual(limiter.get_qps_limit("www.google.com"), 10) + + def test_wildcard_match(self): + """测试通配符匹配""" + limiter = DomainRateLimiter( + rules={"*.taobao.com": 3}, + default_qps=10 + ) + self.assertEqual(limiter.get_qps_limit("item.taobao.com"), 3) + self.assertEqual(limiter.get_qps_limit("detail.taobao.com"), 3) + self.assertEqual(limiter.get_qps_limit("taobao.com"), 3) # 主域名也匹配 + + def test_default_qps(self): + """测试默认QPS""" + limiter = DomainRateLimiter( + rules={"www.baidu.com": 5}, + default_qps=20 + ) + self.assertEqual(limiter.get_qps_limit("www.unknown.com"), 20) + + def test_zero_default_qps_no_limit(self): + """测试默认QPS为0时不限制""" + limiter = DomainRateLimiter( + rules={"www.baidu.com": 5}, + default_qps=0 + ) + # 未匹配的域名应该返回0(不限制) + wait_time = limiter.acquire("www.unknown.com") + self.assertEqual(wait_time, 0) + + def test_acquire_with_limit(self): + """测试获取令牌""" + limiter = DomainRateLimiter( + rules={"www.test.com": 2}, + default_qps=0 + ) + # 第1次应该立即返回(初始有1个令牌) + self.assertEqual(limiter.acquire("www.test.com"), 0) + # 第2次应该需要等待 + wait_time = limiter.acquire("www.test.com") + self.assertGreater(wait_time, 0) + + def test_extract_domain(self): + """测试URL域名提取""" + self.assertEqual( + DomainRateLimiter.extract_domain("https://www.baidu.com/path"), + "www.baidu.com" + ) + self.assertEqual( + DomainRateLimiter.extract_domain("http://example.com:8080/api"), + "example.com" + ) + self.assertEqual( + DomainRateLimiter.extract_domain(""), + "" + ) + self.assertEqual( + DomainRateLimiter.extract_domain(None), + "" + ) + + def test_empty_domain(self): + """测试空域名""" + limiter = DomainRateLimiter(default_qps=10) + self.assertEqual(limiter.get_qps_limit(""), 10) + self.assertEqual(limiter.get_qps_limit(None), 10) + + +class TestDomainRateLimiterQPSAccuracy(unittest.TestCase): + """QPS精度测试""" + + def test_qps_accuracy(self): + """测试QPS控制精度 - 验证预扣机制正确排队""" + limiter = DomainRateLimiter( + rules={"test.com": 10}, # 10 QPS + default_qps=0 + ) + + # 模拟获取多个令牌 + wait_times = [] + for i in range(5): + wait_time = limiter.acquire("test.com") + wait_times.append(wait_time) + + # 第1个应该立即返回(初始有1个令牌) + self.assertEqual(wait_times[0], 0) + # 后续应该需要等待,且等待时间递增(排队效果) + for i in range(1, len(wait_times)): + self.assertGreater(wait_times[i], 0) + if i > 1: + # 由于预扣机制,后续请求的等待时间应该比前一个多约0.1秒 + self.assertGreater(wait_times[i], wait_times[i-1] - 0.01) + + def test_strict_qps_mode(self): + """测试严格QPS模式 - 验证等待时间按1/QPS递增""" + limiter = DomainRateLimiter( + rules={"strict.com": 5}, # 5 QPS = 每0.2秒1个请求 + default_qps=0 + ) + + # 消耗初始令牌 + wait_time0 = limiter.acquire("strict.com") + self.assertEqual(wait_time0, 0) # 第1个立即返回 + + # 后续获取应该返回递增的等待时间 + wait_time1 = limiter.acquire("strict.com") + self.assertAlmostEqual(wait_time1, 0.2, delta=0.05, + msg=f"First wait time: expected ~0.2, got {wait_time1}") + + # 连续获取,由于预扣机制,等待时间应该继续递增 + wait_time2 = limiter.acquire("strict.com") + self.assertAlmostEqual(wait_time2, 0.4, delta=0.05, + msg=f"Second wait time: expected ~0.4, got {wait_time2}") + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/qps-scheduler/test_realistic_performance.py b/tests/qps-scheduler/test_realistic_performance.py new file mode 100644 index 0000000..ba4b1ee --- /dev/null +++ b/tests/qps-scheduler/test_realistic_performance.py @@ -0,0 +1,200 @@ +# -*- coding: utf-8 -*- +""" +模拟真实场景的性能对比测试 + +模拟网络延迟,对比 QPS 开启/关闭时的性能差异 + +Author: ShellMonster +Created: 2024 +""" + +import time +import threading +import random + +import feapder.setting as setting +from feapder import AirSpider, Request + + +class MockResponse: + """模拟响应对象""" + def __init__(self, url): + self.url = url + self.status_code = 200 + self.text = f"Mock response for {url}" + self.content = self.text.encode() + + +def run_realistic_test(): + """模拟真实场景的性能测试""" + print("=" * 70) + print("真实场景性能测试:模拟网络延迟 50-100ms") + print("=" * 70) + + num_requests = 100 + num_threads = 32 + network_delay = (0.05, 0.1) # 模拟 50-100ms 网络延迟 + + # ==================== 测试1:QPS 关闭 ==================== + print(f"\n[测试1] QPS 关闭,{num_requests}个请求,{num_threads}线程") + + times_disabled = [] + lock1 = threading.Lock() + + class DisabledSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": False, + "SPIDER_THREAD_COUNT": num_threads, + "LOG_LEVEL": "ERROR", + } + + def start_requests(self): + for i in range(num_requests): + yield Request(f"https://test.com/page{i}", auto_request=False) + + def parse(self, request, response): + with lock1: + times_disabled.append(time.time()) + + def download_midware(self, request): + # 模拟网络延迟 + time.sleep(random.uniform(*network_delay)) + return MockResponse(request.url) + + start1 = time.time() + DisabledSpider().run() + duration1 = time.time() - start1 + + if len(times_disabled) >= 2: + work_time1 = times_disabled[-1] - times_disabled[0] + else: + work_time1 = 0 + + print(f" 总耗时: {duration1:.3f}秒") + print(f" 处理请求: {len(times_disabled)}") + + # ==================== 测试2:QPS 开启(极高限制) ==================== + print(f"\n[测试2] QPS 开启(default_qps=10000),{num_requests}个请求,{num_threads}线程") + + times_enabled = [] + lock2 = threading.Lock() + + class EnabledSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_DEFAULT": 10000, + "DOMAIN_RATE_LIMIT_RULES": {}, + "DOMAIN_RATE_LIMIT_MAX_PREFETCH": 100, + "SPIDER_THREAD_COUNT": num_threads, + "LOG_LEVEL": "ERROR", + } + + def start_requests(self): + for i in range(num_requests): + yield Request(f"https://test.com/page{i}", auto_request=False) + + def parse(self, request, response): + with lock2: + times_enabled.append(time.time()) + + def download_midware(self, request): + # 模拟网络延迟 + time.sleep(random.uniform(*network_delay)) + return MockResponse(request.url) + + start2 = time.time() + EnabledSpider().run() + duration2 = time.time() - start2 + + if len(times_enabled) >= 2: + work_time2 = times_enabled[-1] - times_enabled[0] + else: + work_time2 = 0 + + print(f" 总耗时: {duration2:.3f}秒") + print(f" 处理请求: {len(times_enabled)}") + + # ==================== 结果对比 ==================== + print("\n" + "=" * 70) + print("性能对比结果(真实场景,有网络延迟)") + print("=" * 70) + + print(f"\n{'指标':<20} {'QPS关闭':<15} {'QPS开启':<15} {'差异':<15}") + print("-" * 70) + print(f"{'总耗时':<20} {duration1:.3f}秒{'':<8} {duration2:.3f}秒{'':<8} {(duration2-duration1):.3f}秒") + + if duration1 > 0: + overhead_pct = ((duration2 - duration1) / duration1) * 100 + print(f"\n总耗时开销: {overhead_pct:.1f}%") + + if abs(overhead_pct) < 5: + print("✅ QPS 架构对真实场景性能几乎无影响(<5%)") + elif abs(overhead_pct) < 15: + print("✅ QPS 架构对真实场景性能影响可接受(5-15%)") + else: + print(f"⚠️ QPS 架构对真实场景性能有影响({overhead_pct:.1f}%)") + + +def run_qps_accuracy_with_delay(): + """带网络延迟的 QPS 精度测试""" + print("\n" + "=" * 70) + print("QPS 精度测试:模拟真实网络延迟场景") + print("=" * 70) + + request_times = [] + lock = threading.Lock() + + class QPSTestSpider(AirSpider): + __custom_setting__ = { + "DOMAIN_RATE_LIMIT_ENABLE": True, + "DOMAIN_RATE_LIMIT_DEFAULT": 5, # 5 QPS + "DOMAIN_RATE_LIMIT_RULES": {}, + "DOMAIN_RATE_LIMIT_MAX_PREFETCH": 50, + "SPIDER_THREAD_COUNT": 8, + "LOG_LEVEL": "ERROR", + } + + def start_requests(self): + for i in range(20): + yield Request(f"https://test.com/page{i}", auto_request=False) + + def parse(self, request, response): + with lock: + request_times.append(time.time()) + + def download_midware(self, request): + # 模拟 100ms 网络延迟 + time.sleep(0.1) + return MockResponse(request.url) + + print(f"配置: 5 QPS, 20个请求, 8线程, 每请求100ms延迟") + print(f"预期: 网络延迟不影响 QPS 控制精度") + + start = time.time() + QPSTestSpider().run() + duration = time.time() - start + + print(f"\n总耗时: {duration:.2f}秒") + print(f"处理请求: {len(request_times)}") + + if len(request_times) >= 2: + request_times.sort() + time_span = request_times[-1] - request_times[0] + actual_qps = (len(request_times) - 1) / time_span if time_span > 0 else 0 + + print(f"配置QPS: 5") + print(f"实际QPS: {actual_qps:.2f}") + + if 4.5 <= actual_qps <= 5.5: + print("✅ QPS 控制精确") + else: + print("⚠️ QPS 控制有偏差") + + +if __name__ == "__main__": + run_realistic_test() + run_qps_accuracy_with_delay() + + print("\n" + "=" * 70) + print("真实场景测试完成!") + print("=" * 70)