1+ from bs4 import BeautifulSoup , Comment
2+ import re , unicodedata
3+ from typing import List , Dict , Tuple
4+ import aiohttp
5+
6+ BAD_CHUNKS = [
7+ "nav" ,"header" ,"footer" ,"aside" ,"form" ,"menu" ,"breadcrumb" ,"toc" ,"pagination" ,
8+ "subscribe" ,"advert" ,"ads" ,"promo" ,"social" ,"share" ,"comment" ,"related" ,"widget" ,
9+ "modal" ,"banner" ,"cookie" ,"newsletter" ,"disclaimer"
10+ ]
11+
12+ def normalize_ws (s : str ) -> str :
13+ s = unicodedata .normalize ("NFKC" , s )
14+ s = re .sub (r"\s+" , " " , s )
15+ return s .strip ()
16+
17+ def clean_soup (soup : BeautifulSoup ) -> None :
18+ for el in soup (["script" ,"style" ,"noscript" ,"template" ,"svg" ,"canvas" ,"iframe" ]):
19+ el .decompose ()
20+ for c in soup .find_all (string = lambda t : isinstance (t , Comment )):
21+ c .extract ()
22+ for tag in soup .find_all (True ):
23+ ident = " " .join ([
24+ (tag .get ("id" ) or "" ),
25+ " " .join (tag .get ("class" ) or []),
26+ (tag .get ("role" ) or "" )
27+ ]).lower ()
28+ if any (b in ident for b in BAD_CHUNKS ):
29+ tag .decompose ()
30+
31+ def heading_level (tag ) -> int :
32+ return int (tag .name [1 ])
33+
34+ def group_by_sections (soup ):
35+ sections = []
36+ for section in soup .find_all (['section' , 'article' ]):
37+ # Use the first heading if present for section title
38+ heading = section .find (re .compile ('^h[1-6]$' ))
39+ title = normalize_ws (heading .get_text ()) if heading else ""
40+ paragraphs = []
41+ for p in section .find_all ('p' ):
42+ txt = normalize_ws (p .get_text ())
43+ if txt :
44+ paragraphs .append (txt )
45+ if paragraphs :
46+ # All paragraphs in the section are joined with blanklines; change as you prefer
47+ sections .append ({"title" : title , "content" : "\n \n " .join (paragraphs )})
48+ return sections
49+
50+ def table_to_markdown (table ):
51+ # Simple HTML table to Markdown converter
52+ rows = []
53+ for tr in table .find_all ("tr" ):
54+ cols = [td .get_text (strip = True ) for td in tr .find_all (["td" , "th" ])]
55+ rows .append (cols )
56+ # Make Markdown
57+ md = ""
58+ if rows :
59+ md += "| " + " | " .join (rows [0 ]) + " |\n "
60+ md += "| " + " | " .join ("---" for _ in rows [0 ]) + " |\n "
61+ for row in rows [1 :]:
62+ md += "| " + " | " .join (row ) + " |\n "
63+ return md
64+
65+ def group_by_headings (soup ):
66+ grouped = []
67+ # Find all headings
68+ for hdr in soup .find_all (re .compile ("^h[1-6]$" )):
69+ title = normalize_ws (hdr .get_text ())
70+ buffer = []
71+ # Find next siblings until another heading of this or higher level
72+ for sib in hdr .find_next_siblings ():
73+ if sib .name and re .match (r"^h[1-6]$" , sib .name , re .I ):
74+ if int (sib .name [1 ]) <= int (hdr .name [1 ]):
75+ break
76+ if sib .name == "p" :
77+ text = normalize_ws (sib .get_text ())
78+ if text :
79+ buffer .append (text )
80+ elif sib .name in ("ul" , "ol" ):
81+ for li in sib .find_all ('li' ):
82+ text = normalize_ws (li .get_text ())
83+ if text :
84+ buffer .append ("• " + text )
85+ if buffer :
86+ grouped .append ({"title" : title , "content" : "\n \n " .join (buffer )})
87+ return grouped
88+
89+ def sections_to_markdown (sections : List [Dict ]) -> str :
90+ lines : List [str ] = []
91+ for s in sections :
92+ hashes = "#" * max (1 , min (6 , s ["level" ]))
93+ lines .append (f"{ hashes } { s ['title' ]} " )
94+ for p in s ["paragraphs" ]:
95+ lines .append (p )
96+ lines .append ("" )
97+ out = "\n " .join (lines ).strip ()
98+ return out + "\n " if out else out
99+
100+ def slugify (text : str , max_len : int = 80 ) -> str :
101+ text = unicodedata .normalize ("NFKD" , text )
102+ text = re .sub (r"[^\w\s-]" , "" , text ).strip ().lower ()
103+ text = re .sub (r"[\s_-]+" , "-" , text )
104+ return text [:max_len ] or "page"
105+
106+ async def fetch_and_extract_paragraphs (url ):
107+ paragraphs = []
108+ async with aiohttp .ClientSession () as session :
109+ async with session .get (str (url )) as response :
110+ html = await response .text ()
111+ soup = BeautifulSoup (html , 'html.parser' )
112+
113+ for script in soup (["script" , "style" ]):
114+ script .decompose ()
115+ for element in soup (text = lambda text : isinstance (text , Comment )):
116+ element .extract ()
117+
118+ for p in soup .find_all ("p" ):
119+ txt = normalize_ws (p .get_text ())
120+ if txt :
121+ paragraphs .append (txt )
122+ return paragraphs
123+
124+ async def fetch_and_extract_sections (url ):
125+ async with aiohttp .ClientSession () as session :
126+ async with session .get (str (url )) as response :
127+ html = await response .text ()
128+ soup = BeautifulSoup (html , 'html.parser' )
129+
130+ for script in soup (["script" , "style" ]):
131+ script .decompose ()
132+ for element in soup (text = lambda text : isinstance (text , Comment )):
133+ element .extract ()
134+
135+ # Prefer by section, or fallback to headings
136+ chunks = group_by_sections (soup )
137+ if not chunks :
138+ chunks = group_by_headings (soup )
139+ return chunks
0 commit comments