Skip to content

Commit 1b9abd3

Browse files
author
ShellMonster
committed
feat: Item 支持指定 Pipeline 路由
- 新增 Item.__pipelines__ 属性,允许 Item 指定流向哪些 Pipeline - 支持大小写不敏感匹配(csv/CSV/Csv 都有效) - 未指定时流向所有 Pipeline(保持向后兼容) - 修改 ItemBuffer 逻辑,支持 Pipeline 过滤 使用示例: class ProductItem(Item): table_name = 'product' __pipelines__ = ['csv'] # 只流向 CSV Pipeline class UserItem(Item): table_name = 'user' __pipelines__ = ['mysql'] # 只流向 MySQL Pipeline class OrderItem(Item): table_name = 'order' __pipelines__ = ['csv', 'MySQL'] # 流向两者,大小写不敏感
1 parent 50915f3 commit 1b9abd3

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

feapder/buffer/item_buffer.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,14 @@ def __pick_items(self, items, is_update_item=False):
217217
将每个表之间的数据分开 拆分后 原items为空
218218
@param items:
219219
@param is_update_item:
220-
@return:
220+
@return: (datas_dict, pipelines_dict)
221221
"""
222222
datas_dict = {
223223
# 'table_name': [{}, {}]
224224
}
225+
pipelines_dict = {
226+
# 'table_name': ['csv', 'mysql'] or None
227+
}
225228

226229
while items:
227230
item = items.pop(0)
@@ -235,16 +238,26 @@ def __pick_items(self, items, is_update_item=False):
235238

236239
if table_name not in datas_dict:
237240
datas_dict[table_name] = []
241+
# 保存这个 table 的 pipelines 配置(只需保存一次)
242+
pipelines_dict[table_name] = getattr(item, '__pipelines__', None)
238243

239244
datas_dict[table_name].append(item.to_dict)
240245

241246
if is_update_item and table_name not in self._item_update_keys:
242247
self._item_update_keys[table_name] = item.update_key
243248

244-
return datas_dict
249+
return datas_dict, pipelines_dict
245250

246-
def __export_to_db(self, table, datas, is_update=False, update_keys=()):
251+
def __export_to_db(self, table, datas, is_update=False, update_keys=(), allowed_pipelines=None):
247252
for pipeline in self._pipelines:
253+
# 如果 item 指定了 pipelines,检查是否匹配(忽略大小写)
254+
if allowed_pipelines is not None:
255+
pipeline_name = pipeline.__class__.__name__.replace("Pipeline", "").lower()
256+
# 将用户指定的 pipeline 名称也转为小写进行比较
257+
allowed_pipelines_lower = [p.lower() for p in allowed_pipelines]
258+
if pipeline_name not in allowed_pipelines_lower:
259+
continue # 跳过不匹配的 pipeline
260+
248261
if is_update:
249262
if table == self._task_table and not isinstance(
250263
pipeline, MysqlPipeline
@@ -287,14 +300,15 @@ def __add_item_to_db(
287300
if setting.ITEM_FILTER_ENABLE:
288301
items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
289302

290-
# 分捡
291-
items_dict = self.__pick_items(items)
292-
update_items_dict = self.__pick_items(update_items, is_update_item=True)
303+
# 分捡(返回值包含 pipelines_dict)
304+
items_dict, items_pipelines = self.__pick_items(items)
305+
update_items_dict, update_pipelines = self.__pick_items(update_items, is_update_item=True)
293306

294307
# item批量入库
295308
failed_items = {"add": [], "update": [], "requests": []}
296309
while items_dict:
297310
table, datas = items_dict.popitem()
311+
allowed_pipelines = items_pipelines.get(table)
298312

299313
log.debug(
300314
"""
@@ -305,13 +319,14 @@ def __add_item_to_db(
305319
% (table, tools.dumps_json(datas, indent=16))
306320
)
307321

308-
if not self.__export_to_db(table, datas):
322+
if not self.__export_to_db(table, datas, allowed_pipelines=allowed_pipelines):
309323
export_success = False
310324
failed_items["add"].append({"table": table, "datas": datas})
311325

312326
# 执行批量update
313327
while update_items_dict:
314328
table, datas = update_items_dict.popitem()
329+
allowed_pipelines = update_pipelines.get(table)
315330

316331
log.debug(
317332
"""
@@ -324,7 +339,7 @@ def __add_item_to_db(
324339

325340
update_keys = self._item_update_keys.get(table)
326341
if not self.__export_to_db(
327-
table, datas, is_update=True, update_keys=update_keys
342+
table, datas, is_update=True, update_keys=update_keys, allowed_pipelines=allowed_pipelines
328343
):
329344
export_success = False
330345
failed_items["update"].append(

feapder/network/item.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def __new__(cls, name, bases, attrs):
2020
attrs.setdefault("__name_underline__", None)
2121
attrs.setdefault("__update_key__", None)
2222
attrs.setdefault("__unique_key__", None)
23+
attrs.setdefault("__pipelines__", None)
2324

2425
return type.__new__(cls, name, bases, attrs)
2526

@@ -69,6 +70,7 @@ def to_dict(self):
6970
"__name_underline__",
7071
"__update_key__",
7172
"__unique_key__",
73+
"__pipelines__",
7274
):
7375
if key.startswith(f"_{self.__class__.__name__}"):
7476
key = key.replace(f"_{self.__class__.__name__}", "")
@@ -145,6 +147,7 @@ def to_UpdateItem(self):
145147

146148
class UpdateItem(Item):
147149
__update_key__ = []
150+
__pipelines__ = None
148151

149152
def __init__(self, **kwargs):
150153
super(UpdateItem, self).__init__(**kwargs)

0 commit comments

Comments
 (0)