Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
544 changes: 544 additions & 0 deletions docs/csv_pipeline.md

Large diffs are not rendered by default.

144 changes: 144 additions & 0 deletions examples/csv_pipeline_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# -*- coding: utf-8 -*-
"""
Created on 2025-10-16
---------
@summary: CSV Pipeline 使用示例
---------
@author: 道长
@email: ctrlf4@yeah.net

演示如何使用 CsvPipeline 将爬虫数据保存为 CSV 文件。
"""

import feapder
from feapder.network.item import Item


# 定义数据项目
class ProductItem(Item):
"""商品数据项"""

# 指定表名,对应 CSV 文件名为 product.csv
table_name = "product"

def clean(self):
"""数据清洁方法(可选)"""
pass


class CsvPipelineSpider(feapder.AirSpider):
"""
演示使用CSV Pipeline的爬虫

注意:要启用CsvPipeline,需要在 setting.py 中配置:
ITEM_PIPELINES = [
...,
"feapder.pipelines.csv_pipeline.CsvPipeline",
]
"""

def start_requests(self):
"""生成初始请求"""
# 这里以示例数据代替真实网络请求
yield feapder.Request("https://example.com/products")

def parse(self, request, response):
"""
解析页面

在实际应用中,你会从HTML中提取数据。
这里我们生成示例数据来演示CSV存储功能。
"""
# 示例:生成10条商品数据
for i in range(10):
item = ProductItem()
item.id = i + 1
item.name = f"商品_{i + 1}"
item.price = 99.99 + i
item.category = "电子产品"
item.url = f"https://example.com/product/{i + 1}"

yield item


class CsvPipelineSpiderWithMultiTables(feapder.AirSpider):
"""
演示使用CSV Pipeline处理多表数据

CsvPipeline支持多表存储,每个表对应一个CSV文件。
"""

def start_requests(self):
"""生成初始请求"""
yield feapder.Request("https://example.com/products")
yield feapder.Request("https://example.com/users")

def parse(self, request, response):
"""解析页面,输出不同表的数据"""

if "/products" in request.url:
# 产品表数据
for i in range(5):
item = ProductItem()
item.id = i + 1
item.name = f"商品_{i + 1}"
item.price = 99.99 + i
item.category = "电子产品"
item.url = request.url

yield item

elif "/users" in request.url:
# 用户表数据
user_item = Item()
user_item.table_name = "user"

for i in range(5):
user_item.id = i + 1
user_item.username = f"user_{i + 1}"
user_item.email = f"user_{i + 1}@example.com"
user_item.created_at = "2024-10-16"

yield user_item


# 配置说明
"""
使用CSV Pipeline需要的配置步骤:

1. 在 feapder/setting.py 中启用 CsvPipeline:

ITEM_PIPELINES = [
"feapder.pipelines.mysql_pipeline.MysqlPipeline", # 保持MySQL
"feapder.pipelines.csv_pipeline.CsvPipeline", # 新增CSV
]

2. CSV文件会自动保存到 data/csv/ 目录下:
- product.csv: 商品表数据
- user.csv: 用户表数据
- 等等...

3. CSV文件会自动包含表头(首次创建时)

4. 如果爬虫中断后重新启动,CSV数据会继续追加
(支持断点续爬)

性能特点:
- 每批数据最多1000条(由 ITEM_UPLOAD_BATCH_MAX_SIZE 控制)
- 每秒最多1000条,或等待1秒触发批处理
- 使用Per-Table Lock,确保单表写入安全
- 通过 fsync 确保数据落盘,不会丢失

注意事项:
- CSV文件本身不支持真正的UPDATE操作
- 如果有重复数据,可在应用层处理或启用 ITEM_FILTER_ENABLE
- 如果需要真正的UPDATE操作,建议配合MySQL或MongoDB使用
"""


if __name__ == "__main__":
# 运行爬虫示例
CsvPipelineSpider().start()

# 或运行多表示例
# CsvPipelineSpiderWithMultiTables().start()
31 changes: 23 additions & 8 deletions feapder/buffer/item_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,14 @@ def __pick_items(self, items, is_update_item=False):
将每个表之间的数据分开 拆分后 原items为空
@param items:
@param is_update_item:
@return:
@return: (datas_dict, pipelines_dict)
"""
datas_dict = {
# 'table_name': [{}, {}]
}
pipelines_dict = {
# 'table_name': ['csv', 'mysql'] or None
}

while items:
item = items.pop(0)
Expand All @@ -235,16 +238,26 @@ def __pick_items(self, items, is_update_item=False):

if table_name not in datas_dict:
datas_dict[table_name] = []
# 保存这个 table 的 pipelines 配置(只需保存一次)
pipelines_dict[table_name] = getattr(item, '__pipelines__', None)

datas_dict[table_name].append(item.to_dict)

if is_update_item and table_name not in self._item_update_keys:
self._item_update_keys[table_name] = item.update_key

return datas_dict
return datas_dict, pipelines_dict

def __export_to_db(self, table, datas, is_update=False, update_keys=()):
def __export_to_db(self, table, datas, is_update=False, update_keys=(), allowed_pipelines=None):
for pipeline in self._pipelines:
# 如果 item 指定了 pipelines,检查是否匹配(忽略大小写)
if allowed_pipelines is not None:
pipeline_name = pipeline.__class__.__name__.replace("Pipeline", "").lower()
# 将用户指定的 pipeline 名称也转为小写进行比较
allowed_pipelines_lower = [p.lower() for p in allowed_pipelines]
if pipeline_name not in allowed_pipelines_lower:
continue # 跳过不匹配的 pipeline

if is_update:
if table == self._task_table and not isinstance(
pipeline, MysqlPipeline
Expand Down Expand Up @@ -287,14 +300,15 @@ def __add_item_to_db(
if setting.ITEM_FILTER_ENABLE:
items, items_fingerprints = self.__dedup_items(items, items_fingerprints)

# 分捡
items_dict = self.__pick_items(items)
update_items_dict = self.__pick_items(update_items, is_update_item=True)
# 分捡(返回值包含 pipelines_dict)
items_dict, items_pipelines = self.__pick_items(items)
update_items_dict, update_pipelines = self.__pick_items(update_items, is_update_item=True)

# item批量入库
failed_items = {"add": [], "update": [], "requests": []}
while items_dict:
table, datas = items_dict.popitem()
allowed_pipelines = items_pipelines.get(table)

log.debug(
"""
Expand All @@ -305,13 +319,14 @@ def __add_item_to_db(
% (table, tools.dumps_json(datas, indent=16))
)

if not self.__export_to_db(table, datas):
if not self.__export_to_db(table, datas, allowed_pipelines=allowed_pipelines):
export_success = False
failed_items["add"].append({"table": table, "datas": datas})

# 执行批量update
while update_items_dict:
table, datas = update_items_dict.popitem()
allowed_pipelines = update_pipelines.get(table)

log.debug(
"""
Expand All @@ -324,7 +339,7 @@ def __add_item_to_db(

update_keys = self._item_update_keys.get(table)
if not self.__export_to_db(
table, datas, is_update=True, update_keys=update_keys
table, datas, is_update=True, update_keys=update_keys, allowed_pipelines=allowed_pipelines
):
export_success = False
failed_items["update"].append(
Expand Down
3 changes: 3 additions & 0 deletions feapder/network/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __new__(cls, name, bases, attrs):
attrs.setdefault("__name_underline__", None)
attrs.setdefault("__update_key__", None)
attrs.setdefault("__unique_key__", None)
attrs.setdefault("__pipelines__", None)

return type.__new__(cls, name, bases, attrs)

Expand Down Expand Up @@ -69,6 +70,7 @@ def to_dict(self):
"__name_underline__",
"__update_key__",
"__unique_key__",
"__pipelines__",
):
if key.startswith(f"_{self.__class__.__name__}"):
key = key.replace(f"_{self.__class__.__name__}", "")
Expand Down Expand Up @@ -145,6 +147,7 @@ def to_UpdateItem(self):

class UpdateItem(Item):
__update_key__ = []
__pipelines__ = None

def __init__(self, **kwargs):
super(UpdateItem, self).__init__(**kwargs)
Expand Down
Loading