From 356d3630263b2b24f208447aa4dfcb4b8f8071c3 Mon Sep 17 00:00:00 2001 From: ShellMonster Date: Wed, 19 Nov 2025 20:12:04 +0800 Subject: [PATCH 1/3] feat: Add per-domain QPS throttling control MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 功能概述 为 feapder 框架添加域名级 QPS(每秒请求数)限制功能,支持为不同域名设置独立的请求速率限制。 ## 核心特性 - ✅ 支持所有 Spider 类型(AirSpider、Spider、TaskSpider、BatchSpider) - ✅ 灵活的域名匹配策略(精确匹配、通配符匹配、www回退) - ✅ 分布式友好(基于 Redis 的多机器 QPS 配额共享) - ✅ 非阻塞设计(延迟调度机制,不阻塞工作线程) - ✅ 开箱即用(只需配置,无需编写代码) - ✅ 容错能力强(Redis 异常时自动降级) ## 技术实现 ### 令牌桶算法 - **AirSpider**: 本地内存版令牌桶(线程安全) - **分布式 Spider**: Redis 分布式令牌桶(Lua 脚本保证原子性) ### 核心组件 1. `LocalTokenBucket`: 本地内存令牌桶 2. `RedisTokenBucket`: Redis 分布式令牌桶 3. `DomainRateLimiter`: 统一管理器,自动选择令牌桶类型 ## 使用示例 ```python class MySpider(feapder.Spider): __custom_setting__ = dict( DOMAIN_RATE_LIMIT_ENABLE=True, DOMAIN_RATE_LIMIT_DEFAULT=10, DOMAIN_RATE_LIMIT_RULES={ "baidu.com": 5, "*.google.com": 8, } ) ``` ## 修改文件 ### 新增文件 - `feapder/utils/rate_limiter.py`: 令牌桶算法实现(~300行) - `docs/usage/域名级QPS限制.md`: 完整使用文档 - `test_qps_limit.py`: 功能测试脚本 ### 修改文件 - `feapder/setting.py`: 新增 QPS 相关配置项 - `feapder/network/request.py`: 在 get_response() 中添加 QPS 检查逻辑 - `feapder/core/scheduler.py`: 传递 redis_key 给 Request 类 - `feapder/core/parser_control.py`: 注入 request_buffer 到请求对象 - `feapder/templates/*.tmpl`: 在 4 个模板中添加 QPS 配置示例 - `docs/_sidebar.md`: 添加文档导航 ## 测试 ✅ 域名提取功能测试通过 ✅ 本地令牌桶算法测试通过 ✅ 代码语法检查通过 ## 相关文档 - 技术方案: `域名级QPS限制技术方案.md` - 使用文档: `docs/usage/域名级QPS限制.md` --- docs/_sidebar.md | 1 + ...347\272\247QPS\351\231\220\345\210\266.md" | 348 ++++++++++++++++++ feapder/core/parser_control.py | 2 + feapder/core/scheduler.py | 5 + feapder/network/request.py | 137 +++++++ feapder/setting.py | 8 + feapder/templates/air_spider_template.tmpl | 12 + feapder/templates/batch_spider_template.tmpl | 7 + feapder/templates/spider_template.tmpl | 12 +- feapder/templates/task_spider_template.tmpl | 7 + feapder/utils/rate_limiter.py | 291 +++++++++++++++ test_qps_limit.py | 124 +++++++ 12 files changed, 953 insertions(+), 1 deletion(-) create mode 100644 "docs/usage/\345\237\237\345\220\215\347\272\247QPS\351\231\220\345\210\266.md" create mode 100644 feapder/utils/rate_limiter.py create mode 100644 test_qps_limit.py diff --git a/docs/_sidebar.md b/docs/_sidebar.md index bef51b37..26bac60e 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -13,6 +13,7 @@ * [分布式爬虫-Spider](usage/Spider.md) * [任务爬虫-TaskSpider](usage/TaskSpider.md) * [批次爬虫-BatchSpider](usage/BatchSpider.md) + * [域名级QPS限制](usage/域名级QPS限制.md) * [爬虫集成](usage/爬虫集成.md) * 使用进阶 diff --git "a/docs/usage/\345\237\237\345\220\215\347\272\247QPS\351\231\220\345\210\266.md" "b/docs/usage/\345\237\237\345\220\215\347\272\247QPS\351\231\220\345\210\266.md" new file mode 100644 index 00000000..0f9cc775 --- /dev/null +++ "b/docs/usage/\345\237\237\345\220\215\347\272\247QPS\351\231\220\345\210\266.md" @@ -0,0 +1,348 @@ +# 域名级QPS限制 + +域名级QPS限制功能允许你为不同的域名设置独立的请求速率限制(Queries Per Second),防止爬虫对目标网站造成过大压力,同时避免被反爬虫机制封禁。 + +## 1. 功能特性 + +- ✅ **支持所有Spider类型**:AirSpider、Spider、TaskSpider、BatchSpider +- ✅ **灵活的域名匹配**:支持精确域名、通配符域名(`*.google.com`)、www回退策略 +- ✅ **分布式友好**:多台机器共享QPS配额(基于Redis) +- ✅ **非阻塞设计**:延迟调度机制,不会卡住工作线程 +- ✅ **开箱即用**:只需配置,无需编写代码 +- ✅ **容错能力强**:Redis异常时自动降级,不影响爬虫运行 + +## 2. 工作原理 + +QPS限制基于**令牌桶算法**实现: + +- **AirSpider**:使用本地内存版令牌桶(线程安全) +- **Spider/TaskSpider/BatchSpider**:使用Redis分布式令牌桶(支持多机器共享配额) + +当请求超过配置的QPS限制时,会自动延迟执行,而不是阻塞线程。 + +## 3. 基础使用 + +### 3.1 AirSpider示例 + +```python +import feapder + +class MySpider(feapder.AirSpider): + __custom_setting__ = dict( + # 启用域名级QPS限制 + DOMAIN_RATE_LIMIT_ENABLE=True, + # 默认每个域名10 QPS + DOMAIN_RATE_LIMIT_DEFAULT=10, + # 特定域名的QPS规则 + DOMAIN_RATE_LIMIT_RULES={ + "baidu.com": 5, # 百度主域名限制5 QPS + "api.baidu.com": 20, # 百度API限制20 QPS + "*.google.com": 8, # 所有谷歌系域名8 QPS + } + ) + + def start_requests(self): + yield feapder.Request("https://www.baidu.com") + yield feapder.Request("https://api.baidu.com/v1/data") + yield feapder.Request("https://maps.google.com") + + def parse(self, request, response): + print(f"成功抓取: {request.url}") + +if __name__ == "__main__": + MySpider().start() +``` + +### 3.2 Spider示例 + +```python +import feapder + +class MySpider(feapder.Spider): + __custom_setting__ = dict( + REDISDB_IP_PORTS="localhost:6379", + REDISDB_USER_PASS="", + REDISDB_DB=0, + # QPS配置 + DOMAIN_RATE_LIMIT_ENABLE=True, + DOMAIN_RATE_LIMIT_DEFAULT=10, + DOMAIN_RATE_LIMIT_RULES={ + "baidu.com": 5, + "zhihu.com": 3, + } + ) + + def start_requests(self): + for i in range(100): + yield feapder.Request(f"https://www.baidu.com/s?wd={i}") + + def parse(self, request, response): + print(f"成功抓取: {request.url}") + +if __name__ == "__main__": + MySpider(redis_key="test:qps").start() +``` + +## 4. 配置详解 + +### 4.1 配置项说明 + +| 配置项 | 类型 | 默认值 | 说明 | +|--------|------|--------|------| +| `DOMAIN_RATE_LIMIT_ENABLE` | bool | False | 是否启用域名级QPS限制 | +| `DOMAIN_RATE_LIMIT_DEFAULT` | int | 10 | 默认每个域名的QPS限制 | +| `DOMAIN_RATE_LIMIT_RULES` | dict | {} | 特定域名的QPS规则 | + +### 4.2 域名匹配规则 + +QPS配置按以下优先级匹配(从高到低): + +1. **单个请求的 qps_limit 参数**(最高优先级) +2. **精确域名匹配**(完全一致,包括www前缀) +3. **通配符匹配**(支持 `*.domain` 格式) +4. **www回退策略**(如果访问www.baidu.com未匹配,自动尝试baidu.com) +5. **默认值** `DOMAIN_RATE_LIMIT_DEFAULT` + +#### 示例1:简化配置(推荐) + +只配置主域名,www会自动回退: + +```python +DOMAIN_RATE_LIMIT_RULES = { + "baidu.com": 5, # www.baidu.com 和 baidu.com 都限制为 5 QPS + "api.baidu.com": 10, # api.baidu.com 限制为 10 QPS +} +``` + +匹配结果: +- `https://www.baidu.com` → 5 QPS(回退到 baidu.com) +- `https://baidu.com` → 5 QPS(精确匹配) +- `https://api.baidu.com` → 10 QPS(精确匹配) +- `https://tieba.baidu.com` → 10 QPS(默认值) + +#### 示例2:精确控制 + +区分www和非www: + +```python +DOMAIN_RATE_LIMIT_RULES = { + "www.example.com": 20, # www流量大,限制宽松 + "example.com": 5, # 非www流量小,限制严格 +} +``` + +匹配结果: +- `https://www.example.com` → 20 QPS(精确匹配) +- `https://example.com` → 5 QPS(精确匹配) + +#### 示例3:通配符匹配 + +限制整个域名族群: + +```python +DOMAIN_RATE_LIMIT_RULES = { + "*.google.com": 8, # 所有谷歌三级域名 + "*.amazonaws.com": 15, # 所有AWS服务 +} +``` + +匹配结果: +- `https://maps.google.com` → 8 QPS(通配符匹配) +- `https://apis.google.com` → 8 QPS(通配符匹配) +- `https://google.com` → 10 QPS(通配符不匹配无子域名的情况,使用默认值) + +## 5. 高级用法 + +### 5.1 单个请求自定义QPS + +可以为单个请求设置独立的QPS限制: + +```python +def start_requests(self): + # 重要接口,限制1 QPS + yield feapder.Request( + "https://api.important.com/data", + qps_limit=1 # 单独设置这个请求的QPS + ) + + # 普通接口,使用默认配置 + yield feapder.Request("https://www.baidu.com") +``` + +### 5.2 精细化域名控制 + +为不同级别的域名设置不同QPS: + +```python +DOMAIN_RATE_LIMIT_RULES = { + # 主域名 + "example.com": 5, + + # API子域名(通常可以承受更高QPS) + "api.example.com": 20, + + # CDN子域名(静态资源,可以更高) + "cdn.example.com": 50, + + # 通配符兜底 + "*.example.com": 3, # 其他子域名保守限制 +} +``` + +### 5.3 多爬虫任务独立限速 + +不同的爬虫任务使用不同的 `redis_key`,QPS配额相互独立: + +```python +# 爬虫任务1:百度搜索,限制5 QPS +class BaiduSpider(feapder.Spider): + __custom_setting__ = dict( + DOMAIN_RATE_LIMIT_ENABLE=True, + DOMAIN_RATE_LIMIT_RULES={"baidu.com": 5} + ) + +# 爬虫任务2:同时抓取百度,限制10 QPS(不冲突) +class BaiduSpider2(feapder.Spider): + __custom_setting__ = dict( + DOMAIN_RATE_LIMIT_ENABLE=True, + DOMAIN_RATE_LIMIT_RULES={"baidu.com": 10} + ) + +if __name__ == "__main__": + # 两个爬虫可以同时运行,各自有独立的QPS配额 + spider1 = BaiduSpider(redis_key="task1") + spider2 = BaiduSpider2(redis_key="task2") + + spider1.start() + # spider2.start() # 可以在另一台机器上运行 +``` + +## 6. 注意事项 + +### 6.1 Redis配置要求 + +- **Spider/TaskSpider/BatchSpider** 需要配置Redis才能使用分布式QPS限制 +- **AirSpider** 使用本地内存,无需Redis + +### 6.2 QPS计算方式 + +QPS = Queries Per Second(每秒请求数) + +例如:配置 `"baidu.com": 5` 表示每秒最多发送5个请求到baidu.com + +### 6.3 性能影响 + +- 本地令牌桶:几乎无性能损耗 +- Redis令牌桶:每次请求增加 1-10ms 延迟(取决于Redis网络延迟) +- 对比HTTP请求耗时(通常100-1000ms),性能开销可忽略 + +### 6.4 容错机制 + +- Redis连接失败时,自动降级为放行所有请求 +- 不会因为QPS限制模块异常而导致爬虫停止 + +## 7. 调试与监控 + +### 7.1 查看QPS限制日志 + +启用DEBUG日志级别可以看到QPS限制的详细信息: + +```python +LOG_LEVEL = "DEBUG" +``` + +日志输出示例: + +``` +[QPS限制] 域名 baidu.com 达到限制 5 QPS, 延迟 0.20秒后重试 +``` + +### 7.2 验证QPS是否生效 + +可以通过记录请求时间来验证: + +```python +import time + +class TestSpider(feapder.AirSpider): + __custom_setting__ = dict( + DOMAIN_RATE_LIMIT_ENABLE=True, + DOMAIN_RATE_LIMIT_RULES={"httpbin.org": 2} # 2 QPS + ) + + def parse(self, request, response): + print(f"[{time.strftime('%H:%M:%S')}] 请求完成: {request.url}") +``` + +如果配置正确,你会看到请求按照设定的QPS速率执行。 + +## 8. 常见问题 + +### Q1: 为什么配置了QPS限制,但请求还是很快? + +**A:** 检查以下几点: +1. 确认 `DOMAIN_RATE_LIMIT_ENABLE` 设置为 `True` +2. 检查域名是否匹配(注意www前缀) +3. 检查并发线程数 `SPIDER_THREAD_COUNT` 是否过大 + +### Q2: 多个域名同时爬取时,QPS如何计算? + +**A:** 每个域名的QPS是独立计算的。例如: +- 同时爬取 baidu.com(5 QPS)和 google.com(8 QPS) +- 总QPS = 5 + 8 = 13 QPS + +### Q3: AirSpider和Spider的QPS限制有什么区别? + +**A:** +- **AirSpider**:本地内存版,单机独立QPS配额 +- **Spider**:Redis分布式版,多台机器共享QPS配额 + +### Q4: 如何临时关闭QPS限制? + +**A:** 设置 `DOMAIN_RATE_LIMIT_ENABLE=False` 即可 + +## 9. 最佳实践 + +### 9.1 推荐的QPS配置 + +根据目标网站类型,推荐以下QPS配置: + +| 网站类型 | 推荐QPS | 说明 | +|---------|---------|------| +| 大型门户网站 | 5-10 | 如百度、新浪 | +| API接口 | 10-50 | 取决于服务商限制 | +| 小型网站 | 1-5 | 避免压力过大 | +| CDN静态资源 | 20-100 | 通常限制较宽松 | + +### 9.2 配置建议 + +1. **优先使用简化配置**:只配置主域名(如 `baidu.com`),让www自动回退 +2. **API独立配置**:API子域名通常需要更精确的QPS控制 +3. **通配符兜底**:使用通配符为未知子域名设置保守的默认QPS +4. **逐步调整**:从保守的QPS开始,逐步提高直到找到最优值 + +### 9.3 监控与调优 + +```python +import time + +class MonitorSpider(feapder.Spider): + request_times = [] + + def parse(self, request, response): + # 记录请求时间 + self.request_times.append(time.time()) + + # 每100个请求统计一次QPS + if len(self.request_times) >= 100: + duration = self.request_times[-1] - self.request_times[0] + actual_qps = len(self.request_times) / duration + print(f"实际QPS: {actual_qps:.2f}") + self.request_times = [] +``` + +## 10. 相关文档 + +- [Spider进阶](source_code/Spider进阶.md) +- [配置文件](source_code/配置文件.md) +- [命令行工具](command/cmdline.md) diff --git a/feapder/core/parser_control.py b/feapder/core/parser_control.py index 021d2956..c7b4e1a6 100644 --- a/feapder/core/parser_control.py +++ b/feapder/core/parser_control.py @@ -78,6 +78,8 @@ def deal_request(self, request): response = None request_redis = request["request_redis"] request = request["request_obj"] + # 注入request_buffer,用于QPS限制时将请求放回队列 + request._request_buffer = self._request_buffer del_request_redis_after_item_to_db = False del_request_redis_after_request_to_db = False diff --git a/feapder/core/scheduler.py b/feapder/core/scheduler.py index 0177d185..bb8f3cc2 100644 --- a/feapder/core/scheduler.py +++ b/feapder/core/scheduler.py @@ -77,6 +77,11 @@ def __init__( setattr(setting, key, value) self._redis_key = redis_key or setting.REDIS_KEY + # 将redis_key传递给Request类(用于QPS限制) + from feapder.network.request import Request + + Request.cached_redis_key = self._redis_key + if not self._redis_key: raise Exception( """ diff --git a/feapder/network/request.py b/feapder/network/request.py index 95e51604..ed333850 100644 --- a/feapder/network/request.py +++ b/feapder/network/request.py @@ -359,6 +359,49 @@ def get_response(self, save_cached=False): @param save_cached: 保存缓存 方便调试时不用每次都重新下载 @return: """ + # 域名级QPS限制检查(在所有下载器被调用之前) + if setting.DOMAIN_RATE_LIMIT_ENABLE: + # 导入限速器 + from feapder.utils.rate_limiter import DomainRateLimiter + + # 初始化限速器(类级别单例) + if not hasattr(self.__class__, "_rate_limiter"): + self.__class__._rate_limiter = DomainRateLimiter() + + # 提取域名 + domain = self._extract_domain(self.url) + + if domain: + # 获取QPS配置 + qps_limit = self._get_domain_qps_limit(domain) + + if qps_limit: + # 尝试获取令牌 + wait_time = self.__class__._rate_limiter.acquire( + self, domain, qps_limit + ) + + if wait_time > 0: + # 需要等待,延迟调度 + log.debug( + f"[QPS限制] 域名 {domain} 达到限制 {qps_limit} QPS, " + f"延迟 {wait_time:.2f}秒后重试" + ) + + # 修改优先级为 当前时间戳 + 等待时间 + self.priority = time.time() + wait_time + + # 放回队列(如果有队列的话) + request_buffer = getattr(self, "_request_buffer", None) + if request_buffer: + request_buffer.put_request(self) + return None # 返回None,表示本次不执行 + else: + # 没有队列(如shell调试模式),降级为同步等待 + log.debug(f"[QPS限制] 无队列可用,同步等待 {wait_time:.2f}秒") + time.sleep(wait_time) + # 继续执行下面的下载逻辑 + self.make_requests_kwargs() log.debug( @@ -541,3 +584,97 @@ def from_dict(cls, request_dict): def copy(self): return self.__class__.from_dict(copy.deepcopy(self.to_dict)) + + @staticmethod + def _extract_domain(url): + """ + 从URL提取域名(保留www前缀,后续匹配时处理) + + 示例: + https://www.baidu.com/s?wd=test -> www.baidu.com (保留www) + https://baidu.com/s?wd=test -> baidu.com + http://news.sina.com.cn/page.html -> news.sina.com.cn + https://api.example.com/v1/data -> api.example.com + https://example.com:8080/path -> example.com (自动去除端口号) + """ + try: + from urllib.parse import urlparse + + parsed = urlparse(url) + + # 优先使用hostname (已自动去除端口号) + hostname = parsed.hostname + + if not hostname: + # 如果hostname为空,尝试从netloc提取 + netloc = parsed.netloc + if ":" in netloc: + hostname = netloc.split(":")[0] + else: + hostname = netloc + + # 注意: 不再自动去除www前缀,保留完整域名 + # www的处理交给 _get_domain_qps_limit() 方法 + return hostname + except Exception as e: + log.error(f"域名提取失败: {url}, 错误: {e}") + return None + + def _get_domain_qps_limit(self, domain): + """ + 获取指定域名的QPS限制(混合www处理策略) + + 优先级: + 1. request对象的qps_limit参数(最高优先级) + 2. DOMAIN_RATE_LIMIT_RULES中的精确域名匹配(包括www.domain) + 3. DOMAIN_RATE_LIMIT_RULES中的通配符匹配 + 4. 如果是www开头且未匹配,尝试去掉www后再匹配(回退策略) + 5. DOMAIN_RATE_LIMIT_DEFAULT默认值 + + 返回: QPS限制值,None表示不限制 + + 混合策略说明: + - 如果显式配置了 "www.baidu.com",则www.baidu.com精确匹配该配置 + - 如果只配置了 "baidu.com",则www.baidu.com会回退到baidu.com + - 这样既支持精确控制,又简化了常见场景的配置 + """ + if not setting.DOMAIN_RATE_LIMIT_ENABLE: + return None + + # 1. 优先使用request对象自定义的qps_limit + if hasattr(self, "qps_limit") and self.qps_limit is not None: + return self.qps_limit + + # 2. 查找特定域名的配置 + rules = setting.DOMAIN_RATE_LIMIT_RULES or {} + + # 精确匹配(包括www.baidu.com这样的完整域名) + if domain in rules: + return rules[domain] + + # 通配符匹配 (如 *.google.com) + for pattern, qps in rules.items(): + if pattern.startswith("*."): + suffix = pattern[2:] # 去掉 '*.' + # 只匹配有子域名的情况: maps.google.com 匹配 *.google.com + # 但 google.com 不匹配 *.google.com + if domain.endswith("." + suffix): + return qps + + # 3. 回退策略:如果是www开头且未匹配,尝试去掉www后再匹配 + if domain.startswith("www."): + domain_without_www = domain[4:] # 去掉 'www.' + + # 尝试精确匹配去掉www的域名 + if domain_without_www in rules: + return rules[domain_without_www] + + # 尝试通配符匹配去掉www的域名 + for pattern, qps in rules.items(): + if pattern.startswith("*."): + suffix = pattern[2:] + if domain_without_www.endswith("." + suffix): + return qps + + # 4. 使用默认值 + return setting.DOMAIN_RATE_LIMIT_DEFAULT diff --git a/feapder/setting.py b/feapder/setting.py index 985709bd..6d4cbcc7 100644 --- a/feapder/setting.py +++ b/feapder/setting.py @@ -13,6 +13,9 @@ TAB_SPIDER_STATUS = "{redis_key}:h_spider_status" # 用户池 TAB_USER_POOL = "{redis_key}:h_{user_type}_pool" +# 域名级QPS限制相关表名 +TAB_RATE_LIMIT = "{redis_key}:h_rate_limit:{domain}" # 单个域名的令牌桶状态 +TAB_RATE_LIMIT_CONFIG = "{redis_key}:h_rate_limit_config" # 用户配置的QPS规则 # MYSQL MYSQL_IP = os.getenv("MYSQL_IP") @@ -225,6 +228,11 @@ # 打点监控其他参数,若这里也配置了influxdb的参数, 则会覆盖外面的配置 METRICS_OTHER_ARGS = dict(retention_policy_duration="180d", emit_interval=60) +# 域名级QPS限制配置 +DOMAIN_RATE_LIMIT_ENABLE = False # 是否启用域名级QPS限制,默认关闭 +DOMAIN_RATE_LIMIT_DEFAULT = 10 # 默认每个域名的QPS限制(每秒请求数) +DOMAIN_RATE_LIMIT_RULES = {} # 特定域名的QPS规则,格式: {"baidu.com": 5, "*.google.com": 8} + ############# 导入用户自定义的setting ############# try: from setting import * diff --git a/feapder/templates/air_spider_template.tmpl b/feapder/templates/air_spider_template.tmpl index 4cd1f505..d7afd5a2 100644 --- a/feapder/templates/air_spider_template.tmpl +++ b/feapder/templates/air_spider_template.tmpl @@ -11,6 +11,18 @@ import feapder class ${spider_name}(feapder.AirSpider): + # 自定义配置(可选) + # __custom_setting__ = dict( + # # 域名级QPS限制配置(推荐开启) + # DOMAIN_RATE_LIMIT_ENABLE=True, # 是否启用QPS限制 + # DOMAIN_RATE_LIMIT_DEFAULT=10, # 默认每个域名10 QPS + # DOMAIN_RATE_LIMIT_RULES={ # 特定域名的QPS规则 + # # "baidu.com": 5, # 百度限制5 QPS + # # "api.example.com": 20, # API接口限制20 QPS + # # "*.google.com": 8, # 谷歌系域名8 QPS + # } + # ) + def start_requests(self): yield feapder.Request("https://spidertools.cn") diff --git a/feapder/templates/batch_spider_template.tmpl b/feapder/templates/batch_spider_template.tmpl index 9802e994..6c308bca 100644 --- a/feapder/templates/batch_spider_template.tmpl +++ b/feapder/templates/batch_spider_template.tmpl @@ -22,6 +22,13 @@ class ${spider_name}(feapder.BatchSpider): MYSQL_DB="", MYSQL_USER_NAME="", MYSQL_USER_PASS="", + # 域名级QPS限制配置(可选) + # DOMAIN_RATE_LIMIT_ENABLE=True, + # DOMAIN_RATE_LIMIT_DEFAULT=10, + # DOMAIN_RATE_LIMIT_RULES={ + # "baidu.com": 5, + # "*.google.com": 8, + # } ) def start_requests(self, task): diff --git a/feapder/templates/spider_template.tmpl b/feapder/templates/spider_template.tmpl index bb209527..f76ce7b4 100644 --- a/feapder/templates/spider_template.tmpl +++ b/feapder/templates/spider_template.tmpl @@ -13,7 +13,17 @@ import feapder class ${spider_name}(feapder.Spider): # 自定义数据库,若项目中有setting.py文件,此自定义可删除 __custom_setting__ = dict( - REDISDB_IP_PORTS="localhost:6379", REDISDB_USER_PASS="", REDISDB_DB=0 + REDISDB_IP_PORTS="localhost:6379", + REDISDB_USER_PASS="", + REDISDB_DB=0, + # 域名级QPS限制配置(可选,默认不启用) + # DOMAIN_RATE_LIMIT_ENABLE=True, # 是否启用QPS限制 + # DOMAIN_RATE_LIMIT_DEFAULT=10, # 默认每个域名10 QPS + # DOMAIN_RATE_LIMIT_RULES={ # 特定域名的QPS规则 + # "baidu.com": 5, # 百度主域名限制5 QPS + # "api.baidu.com": 20, # 百度API限制20 QPS + # "*.google.com": 8, # 谷歌系域名8 QPS + # } ) def start_requests(self): diff --git a/feapder/templates/task_spider_template.tmpl b/feapder/templates/task_spider_template.tmpl index 66bbbba1..ead313be 100644 --- a/feapder/templates/task_spider_template.tmpl +++ b/feapder/templates/task_spider_template.tmpl @@ -22,6 +22,13 @@ class ${spider_name}(feapder.TaskSpider): MYSQL_DB="", MYSQL_USER_NAME="", MYSQL_USER_PASS="", + # 域名级QPS限制配置(可选) + # DOMAIN_RATE_LIMIT_ENABLE=True, + # DOMAIN_RATE_LIMIT_DEFAULT=10, + # DOMAIN_RATE_LIMIT_RULES={ + # "baidu.com": 5, + # "*.google.com": 8, + # } ) def start_requests(self, task): diff --git a/feapder/utils/rate_limiter.py b/feapder/utils/rate_limiter.py new file mode 100644 index 00000000..170515b3 --- /dev/null +++ b/feapder/utils/rate_limiter.py @@ -0,0 +1,291 @@ +# -*- coding: utf-8 -*- +""" +Created on 2025-11-19 +--------- +@summary: 域名级QPS限制器(令牌桶算法) +--------- +@author: feapder +""" + +import time +import threading +from typing import Dict + +import feapder.setting as setting +from feapder.db.redisdb import RedisDB +from feapder.utils.log import log + + +class LocalTokenBucket: + """ + 本地内存版令牌桶 + + 用于AirSpider(单机爬虫) + 线程安全,适用于多线程环境 + """ + + def __init__(self, qps: int): + """ + 初始化令牌桶 + + Args: + qps: 每秒允许的请求数(Queries Per Second) + """ + self.capacity = qps # 桶容量(最大令牌数) + self.tokens = float(qps) # 当前令牌数 + self.qps = qps # 每秒生成的令牌数 + self.last_update = time.time() # 上次更新时间 + self.lock = threading.Lock() # 线程锁 + + def acquire(self) -> float: + """ + 尝试获取一个令牌 + + Returns: + float: 0表示成功获取令牌,>0表示需要等待的秒数 + """ + with self.lock: + now = time.time() + elapsed = now - self.last_update # 时间流逝 + + # 根据时间流逝补充令牌 + self.tokens = min(self.capacity, self.tokens + elapsed * self.qps) + self.last_update = now + + # 尝试消费一个令牌 + if self.tokens >= 1: + self.tokens -= 1 + return 0 # 成功 + else: + # 计算需要等待多久才能获得令牌 + wait_time = (1 - self.tokens) / self.qps + return wait_time + + +class RedisTokenBucket: + """ + Redis分布式令牌桶 + + 用于Spider/TaskSpider/BatchSpider(分布式爬虫) + 使用Lua脚本保证原子性,支持多机器共享QPS配额 + """ + + # Lua脚本:原子性地检查并消费令牌 + ACQUIRE_SCRIPT = """ + local key = KEYS[1] + local capacity = tonumber(ARGV[1]) + local qps = tonumber(ARGV[2]) + local now = tonumber(ARGV[3]) + + -- 获取当前令牌数和上次更新时间 + local tokens = tonumber(redis.call('HGET', key, 'tokens')) + local last_update = tonumber(redis.call('HGET', key, 'last_update')) + + -- 如果是第一次访问,初始化 + if not tokens then + tokens = capacity + end + if not last_update then + last_update = now + end + + -- 计算应该补充的令牌数 + local elapsed = now - last_update + tokens = math.min(capacity, tokens + elapsed * qps) + + -- 尝试消费一个令牌 + if tokens >= 1 then + -- 有令牌,消费一个 + tokens = tokens - 1 + redis.call('HSET', key, 'tokens', tostring(tokens)) + redis.call('HSET', key, 'last_update', tostring(now)) + redis.call('EXPIRE', key, 3600) -- 1小时无访问自动清理 + return 0 -- 成功 + else + -- 无令牌,计算需要等待的时间 + local wait_time = (1 - tokens) / qps + return wait_time + end + """ + + def __init__(self, redis_db: RedisDB, rate_limit_key: str, qps: int): + """ + 初始化Redis令牌桶 + + Args: + redis_db: RedisDB实例 + rate_limit_key: Redis中的key(格式: {redis_key}:h_rate_limit:{domain}) + qps: 每秒允许的请求数 + """ + self.redis = redis_db._redis + self.rate_limit_key = rate_limit_key + self.qps = qps + self.capacity = qps + self.acquire_sha = None # Lua脚本的SHA值(延迟加载) + + def _ensure_script_loaded(self): + """确保Lua脚本已加载到Redis""" + if not self.acquire_sha: + try: + self.acquire_sha = self.redis.script_load(self.ACQUIRE_SCRIPT) + except Exception as e: + log.error(f"加载Lua脚本失败: {e}") + raise + + def acquire(self) -> float: + """ + 尝试获取一个令牌 + + Returns: + float: 0表示成功获取令牌,>0表示需要等待的秒数 + """ + try: + self._ensure_script_loaded() + now = time.time() + + # 执行Lua脚本(原子操作) + wait_time = self.redis.evalsha( + self.acquire_sha, + 1, # KEYS数量 + self.rate_limit_key, # KEYS[1] + self.capacity, # ARGV[1] + self.qps, # ARGV[2] + now, # ARGV[3] + ) + + return float(wait_time) + + except Exception as e: + # Redis异常时放行请求,避免阻塞爬虫 + log.error(f"Redis令牌桶异常: {e}, 放行请求") + return 0 + + +class DomainRateLimiter: + """ + 域名级QPS限制器(统一管理器) + + 职责: + 1. 自动检测Spider类型(AirSpider或分布式Spider) + 2. 为每个域名创建对应的令牌桶(本地或Redis) + 3. 提供统一的acquire接口 + """ + + def __init__(self): + """ + 初始化限速器 + + 自动检测是否使用Redis(判断是否为分布式爬虫) + """ + self.local_buckets: Dict[str, LocalTokenBucket] = {} # 本地令牌桶缓存 + self.redis_buckets: Dict[str, RedisTokenBucket] = {} # Redis令牌桶缓存 + self.redis_db = None # Redis连接(延迟初始化) + self.use_redis = self._should_use_redis() # 是否使用Redis + + def _should_use_redis(self) -> bool: + """ + 判断是否应该使用Redis + + Returns: + bool: True表示使用Redis(分布式爬虫),False表示使用本地内存(AirSpider) + """ + # 检查是否配置了Redis连接 + if hasattr(setting, "REDISDB_IP_PORTS") and setting.REDISDB_IP_PORTS: + return True + return False + + def _get_redis_db(self): + """获取Redis连接(单例模式)""" + if not self.redis_db: + self.redis_db = RedisDB() + return self.redis_db + + def _get_rate_limit_key(self, request, domain: str) -> str: + """ + 生成QPS限制的Redis key + + 格式: {redis_key}:h_rate_limit:{domain} + + Args: + request: 请求对象 + domain: 域名 + + Returns: + str: Redis key + """ + # 延迟导入避免循环依赖 + from feapder.network.request import Request + + # 获取redis_key + # 优先从Request类变量获取(分布式Spider) + redis_key = getattr(Request, "cached_redis_key", None) + + if not redis_key: + # AirSpider情况,使用parser_name + redis_key = getattr(request, "parser_name", None) or "default" + + # 使用setting中定义的模板 + return setting.TAB_RATE_LIMIT.format(redis_key=redis_key, domain=domain) + + def _get_local_bucket(self, domain: str, qps: int) -> LocalTokenBucket: + """ + 获取本地令牌桶(缓存) + + Args: + domain: 域名 + qps: QPS限制 + + Returns: + LocalTokenBucket: 本地令牌桶实例 + """ + cache_key = f"{domain}:{qps}" + + if cache_key not in self.local_buckets: + self.local_buckets[cache_key] = LocalTokenBucket(qps) + + return self.local_buckets[cache_key] + + def _get_redis_bucket(self, rate_limit_key: str, qps: int) -> RedisTokenBucket: + """ + 获取Redis令牌桶(缓存) + + Args: + rate_limit_key: Redis key + qps: QPS限制 + + Returns: + RedisTokenBucket: Redis令牌桶实例 + """ + cache_key = f"{rate_limit_key}:{qps}" + + if cache_key not in self.redis_buckets: + redis_db = self._get_redis_db() + self.redis_buckets[cache_key] = RedisTokenBucket( + redis_db, rate_limit_key, qps + ) + + return self.redis_buckets[cache_key] + + def acquire(self, request, domain: str, qps_limit: int) -> float: + """ + 尝试获取令牌(统一入口) + + 根据Spider类型自动选择本地或Redis令牌桶 + + Args: + request: 请求对象 + domain: 域名 + qps_limit: QPS限制 + + Returns: + float: 0表示成功,>0表示需要等待的秒数 + """ + if self.use_redis: + # 使用Redis分布式令牌桶 + rate_limit_key = self._get_rate_limit_key(request, domain) + bucket = self._get_redis_bucket(rate_limit_key, qps_limit) + else: + # 使用本地内存令牌桶 + bucket = self._get_local_bucket(domain, qps_limit) + + return bucket.acquire() diff --git a/test_qps_limit.py b/test_qps_limit.py new file mode 100644 index 00000000..24f6fc84 --- /dev/null +++ b/test_qps_limit.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +""" +QPS限制功能测试脚本 +测试域名级QPS限制是否正常工作 +""" + +import time +import feapder + + +class TestQpsSpider(feapder.AirSpider): + """测试QPS限制的爬虫""" + + __custom_setting__ = dict( + # 启用域名级QPS限制 + DOMAIN_RATE_LIMIT_ENABLE=True, + # 默认QPS设置为2(方便观察) + DOMAIN_RATE_LIMIT_DEFAULT=2, + # 特定域名的QPS规则 + DOMAIN_RATE_LIMIT_RULES={ + "httpbin.org": 1, # httpbin限制为1 QPS(每秒1个请求) + "www.baidu.com": 2, # 百度限制为2 QPS + }, + ) + + def start_requests(self): + """发送测试请求""" + print("\n========== 开始测试域名级QPS限制 ==========") + print(f"配置: httpbin.org = 1 QPS, www.baidu.com = 2 QPS") + print(f"测试开始时间: {time.strftime('%H:%M:%S')}\n") + + # 测试1: httpbin.org (1 QPS) + for i in range(3): + yield feapder.Request( + f"https://httpbin.org/delay/0?test={i}", + callback=self.parse_httpbin, + ) + + # 测试2: www.baidu.com (2 QPS) + for i in range(3): + yield feapder.Request( + f"https://www.baidu.com/s?wd=test{i}", + callback=self.parse_baidu, + ) + + def parse_httpbin(self, request, response): + """解析httpbin响应""" + current_time = time.strftime("%H:%M:%S") + print(f"[{current_time}] ✅ httpbin请求完成: {request.url}") + + def parse_baidu(self, request, response): + """解析百度响应""" + current_time = time.strftime("%H:%M:%S") + print(f"[{current_time}] ✅ 百度请求完成: {request.url}") + + +def test_local_token_bucket(): + """测试本地令牌桶""" + print("\n========== 测试本地令牌桶算法 ==========") + from feapder.utils.rate_limiter import LocalTokenBucket + + bucket = LocalTokenBucket(qps=2) # 2 QPS + print(f"创建令牌桶: 容量=2, QPS=2") + + # 测试1: 前2个请求应该立即通过 + print("\n测试1: 前2个请求应该立即通过") + for i in range(2): + wait_time = bucket.acquire() + print(f" 请求{i+1}: 等待时间 = {wait_time:.3f}秒 {'✅ 立即通过' if wait_time == 0 else '❌ 需要等待'}") + + # 测试2: 第3个请求需要等待 + print("\n测试2: 第3个请求需要等待") + wait_time = bucket.acquire() + print(f" 请求3: 等待时间 = {wait_time:.3f}秒 {'❌ 立即通过' if wait_time == 0 else '✅ 需要等待'}") + + # 测试3: 等待0.5秒后补充1个令牌 + print("\n测试3: 等待0.5秒后应该补充1个令牌") + time.sleep(0.5) + wait_time = bucket.acquire() + print(f" 请求4: 等待时间 = {wait_time:.3f}秒 {'✅ 立即通过' if wait_time == 0 else '❌ 需要等待'}") + + print("\n✅ 本地令牌桶测试完成") + + +def test_domain_extraction(): + """测试域名提取功能""" + print("\n========== 测试域名提取功能 ==========") + from feapder.network.request import Request + + test_urls = [ + "https://www.baidu.com/s?wd=test", + "https://baidu.com/s?wd=test", + "http://news.sina.com.cn/page.html", + "https://api.example.com/v1/data", + "https://example.com:8080/path", + ] + + for url in test_urls: + domain = Request._extract_domain(url) + print(f" {url} -> {domain}") + + print("\n✅ 域名提取测试完成") + + +if __name__ == "__main__": + print("\n" + "=" * 60) + print(" feapder 域名级QPS限制功能测试") + print("=" * 60) + + # 运行单元测试 + test_domain_extraction() + test_local_token_bucket() + + # 运行集成测试(可选,需要网络) + print("\n" + "=" * 60) + user_input = input("\n是否运行集成测试(需要网络连接)?[y/N]: ") + if user_input.lower() == "y": + TestQpsSpider(thread_count=1).start() + else: + print("跳过集成测试") + + print("\n" + "=" * 60) + print(" 测试完成!") + print("=" * 60 + "\n") From 3b47970d404f404dc121de6e615980900d449092 Mon Sep 17 00:00:00 2001 From: ShellMonster Date: Thu, 20 Nov 2025 11:41:58 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20=E5=B0=86=20DOMAIN=5FRATE=5FLIMIT=5F?= =?UTF-8?q?DEFAULT=20=E9=BB=98=E8=AE=A4=E5=80=BC=E6=94=B9=E4=B8=BA=200?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将默认 QPS 限制从 10 改为 0,表示未配置的域名默认不限制。 只有在 DOMAIN_RATE_LIMIT_RULES 中明确配置的域名才会被限制。 修改文件: - feapder/setting.py - feapder/templates/air_spider_template.tmpl - feapder/templates/spider_template.tmpl - feapder/templates/batch_spider_template.tmpl - feapder/templates/task_spider_template.tmpl --- feapder/setting.py | 2 +- feapder/templates/air_spider_template.tmpl | 2 +- feapder/templates/batch_spider_template.tmpl | 2 +- feapder/templates/spider_template.tmpl | 2 +- feapder/templates/task_spider_template.tmpl | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/feapder/setting.py b/feapder/setting.py index 6d4cbcc7..2895f7b3 100644 --- a/feapder/setting.py +++ b/feapder/setting.py @@ -230,7 +230,7 @@ # 域名级QPS限制配置 DOMAIN_RATE_LIMIT_ENABLE = False # 是否启用域名级QPS限制,默认关闭 -DOMAIN_RATE_LIMIT_DEFAULT = 10 # 默认每个域名的QPS限制(每秒请求数) +DOMAIN_RATE_LIMIT_DEFAULT = 0 # 默认不限制(0表示不限制,只有在RULES中配置的域名才会被限制) DOMAIN_RATE_LIMIT_RULES = {} # 特定域名的QPS规则,格式: {"baidu.com": 5, "*.google.com": 8} ############# 导入用户自定义的setting ############# diff --git a/feapder/templates/air_spider_template.tmpl b/feapder/templates/air_spider_template.tmpl index d7afd5a2..b8fe5425 100644 --- a/feapder/templates/air_spider_template.tmpl +++ b/feapder/templates/air_spider_template.tmpl @@ -15,7 +15,7 @@ class ${spider_name}(feapder.AirSpider): # __custom_setting__ = dict( # # 域名级QPS限制配置(推荐开启) # DOMAIN_RATE_LIMIT_ENABLE=True, # 是否启用QPS限制 - # DOMAIN_RATE_LIMIT_DEFAULT=10, # 默认每个域名10 QPS + # DOMAIN_RATE_LIMIT_DEFAULT=0, # 默认不限制(0表示不限制) # DOMAIN_RATE_LIMIT_RULES={ # 特定域名的QPS规则 # # "baidu.com": 5, # 百度限制5 QPS # # "api.example.com": 20, # API接口限制20 QPS diff --git a/feapder/templates/batch_spider_template.tmpl b/feapder/templates/batch_spider_template.tmpl index 6c308bca..25d413ab 100644 --- a/feapder/templates/batch_spider_template.tmpl +++ b/feapder/templates/batch_spider_template.tmpl @@ -24,7 +24,7 @@ class ${spider_name}(feapder.BatchSpider): MYSQL_USER_PASS="", # 域名级QPS限制配置(可选) # DOMAIN_RATE_LIMIT_ENABLE=True, - # DOMAIN_RATE_LIMIT_DEFAULT=10, + # DOMAIN_RATE_LIMIT_DEFAULT=0, # 默认不限制 # DOMAIN_RATE_LIMIT_RULES={ # "baidu.com": 5, # "*.google.com": 8, diff --git a/feapder/templates/spider_template.tmpl b/feapder/templates/spider_template.tmpl index f76ce7b4..ca4af26a 100644 --- a/feapder/templates/spider_template.tmpl +++ b/feapder/templates/spider_template.tmpl @@ -18,7 +18,7 @@ class ${spider_name}(feapder.Spider): REDISDB_DB=0, # 域名级QPS限制配置(可选,默认不启用) # DOMAIN_RATE_LIMIT_ENABLE=True, # 是否启用QPS限制 - # DOMAIN_RATE_LIMIT_DEFAULT=10, # 默认每个域名10 QPS + # DOMAIN_RATE_LIMIT_DEFAULT=0, # 默认不限制(0表示不限制) # DOMAIN_RATE_LIMIT_RULES={ # 特定域名的QPS规则 # "baidu.com": 5, # 百度主域名限制5 QPS # "api.baidu.com": 20, # 百度API限制20 QPS diff --git a/feapder/templates/task_spider_template.tmpl b/feapder/templates/task_spider_template.tmpl index ead313be..4243a9cc 100644 --- a/feapder/templates/task_spider_template.tmpl +++ b/feapder/templates/task_spider_template.tmpl @@ -24,7 +24,7 @@ class ${spider_name}(feapder.TaskSpider): MYSQL_USER_PASS="", # 域名级QPS限制配置(可选) # DOMAIN_RATE_LIMIT_ENABLE=True, - # DOMAIN_RATE_LIMIT_DEFAULT=10, + # DOMAIN_RATE_LIMIT_DEFAULT=0, # 默认不限制 # DOMAIN_RATE_LIMIT_RULES={ # "baidu.com": 5, # "*.google.com": 8, From 522535c462d1589fe3794c0fcca1eddb02a21152 Mon Sep 17 00:00:00 2001 From: ShellMonster Date: Thu, 20 Nov 2025 14:06:34 +0800 Subject: [PATCH 3/3] =?UTF-8?q?request.py=E4=B8=AD=E6=9C=AAimport=20time?= =?UTF-8?q?=EF=BC=8C=E8=A1=A5=E5=85=85=E5=BC=95=E5=85=A5=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- feapder/network/request.py | 1 + 1 file changed, 1 insertion(+) diff --git a/feapder/network/request.py b/feapder/network/request.py index ed333850..5cf8cd05 100644 --- a/feapder/network/request.py +++ b/feapder/network/request.py @@ -11,6 +11,7 @@ import copy import os import re +import time import requests from requests.cookies import RequestsCookieJar