-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcrawler.py
More file actions
422 lines (340 loc) · 14.7 KB
/
crawler.py
File metadata and controls
422 lines (340 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
#!/usr/bin/env python3
"""
Core web crawling engine with URL queue management, domain filtering,
rate limiting, and robust error handling.
"""
import time
import requests
from urllib.parse import urljoin, urlparse, urlunparse, parse_qsl, urlencode
from collections import deque
from typing import Set, List, Dict, Optional, Tuple
import tldextract
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from config import CrawlerConfig
from database import CrawlerDatabase
from robots_parser import RobotsPolicy
class WebCrawler:
"""Manages web crawling with depth tracking and domain filtering."""
def __init__(self, db: CrawlerDatabase, config: CrawlerConfig):
"""
Initialize the web crawler.
Args:
db: Database instance for storing crawl data
config: Configuration object with crawl settings
"""
self.db = db
self.config = config
# URL queue: (url, depth) tuples
self.queue = deque()
# Tracking sets
self.seen_urls: Set[str] = set()
self.crawled_urls: Set[str] = set()
# Request session with custom headers
self.session = requests.Session()
self.session.headers.update({
'User-Agent': config.USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
})
# Domain filtering
self.allowed_domains_set = set(config.ALLOWED_DOMAINS) if config.ALLOWED_DOMAINS else None
# Robots policy and per-host delay tracking
self.robots = RobotsPolicy(
user_agent=config.USER_AGENT,
fallback_delay=config.ROBOTS_FALLBACK_DELAY,
enabled=config.ROBOTS_OBEY,
)
self._last_fetch_by_host: Dict[str, float] = {}
def initialize(self, seed_urls: List[str]):
"""
Initialize the crawler with seed URLs.
Args:
seed_urls: List of starting URLs to crawl
"""
# Load already crawled URLs from database
existing_urls = self.db.get_all_urls()
self.crawled_urls.update(existing_urls)
self.seen_urls.update(existing_urls)
print(f"📊 Found {len(existing_urls)} URLs already in database")
# Add seed URLs to queue
for url in seed_urls:
normalized_url = self._normalize_url(url)
if normalized_url and normalized_url not in self.seen_urls:
self.queue.append((normalized_url, 0))
self.seen_urls.add(normalized_url)
print(f"🌱 Added seed URL: {normalized_url}")
# Optional: discover additional URLs from sitemaps/feeds
try:
from discovery_module import discover_urls
for url in seed_urls:
for durl in discover_urls(url)[:50]: # limit to avoid explosion
nu = self._normalize_url(durl)
if nu and nu not in self.seen_urls and self._is_allowed_domain(nu):
self.queue.append((nu, 1))
self.seen_urls.add(nu)
except Exception:
pass
def _normalize_url(self, url: str) -> Optional[str]:
"""
Normalize URL by removing fragments, stripping tracking params, and trailing slashes.
"""
try:
p = urlparse(url)
# strip fragment
path = p.path
query = p.query
# optionally strip tracking params
if self.config.STRIP_TRACKING_PARAMS and query:
pairs = []
for k, v in parse_qsl(query, keep_blank_values=True):
lk = k.lower()
if lk.startswith("utm_") or lk in {"fbclid", "gclid"}:
continue
pairs.append((k, v))
query = urlencode(pairs, doseq=True)
# remove trailing slash except root
if path.endswith("/") and len(path) > 1:
path = path[:-1]
normalized = urlunparse((p.scheme, p.netloc, path, "", query, ""))
return normalized
except Exception as e:
print(f"⚠️ Failed to normalize URL {url}: {e}")
return None
def _is_allowed_domain(self, url: str) -> bool:
"""
Check if URL belongs to an allowed domain.
Args:
url: URL to check
Returns:
True if allowed, False otherwise
"""
if not self.allowed_domains_set:
# No domain restriction
return True
try:
extracted = tldextract.extract(url)
registered_domain = f"{extracted.domain}.{extracted.suffix}"
return registered_domain in self.allowed_domains_set
except Exception as e:
print(f"⚠️ Failed to extract domain from {url}: {e}")
return False
def _should_skip_url(self, url: str) -> Tuple[bool, str]:
"""
Determine if URL should be skipped based on extension or patterns.
Args:
url: URL to check
Returns:
Tuple of (should_skip: bool, reason: str)
"""
parsed = urlparse(url)
path = parsed.path.lower()
# Enforce optional docs path prefix restriction
docs_prefix = (self.config.DOCS_PATH_PREFIX or "").lower().strip()
if docs_prefix and not path.startswith(docs_prefix):
return True, f"outside docs prefix: {docs_prefix}"
# Check file extensions
for ext in self.config.SKIP_EXTENSIONS:
if path.endswith(ext):
return True, f"Binary/media file: {ext}"
# Skip common non-content paths
for pattern in self.config.SKIP_PATH_PATTERNS:
if pattern in path:
return True, f"Non-content path: {pattern}"
return False, ""
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((requests.RequestException, requests.Timeout))
)
def _fetch_url(self, url: str) -> Optional[requests.Response]:
"""Fetch URL content with retry logic and polite delays."""
try:
# Polite per-host delay (robots or configured)
host = urlparse(url).netloc
robots_delay = self.robots.get_crawl_delay(url) if self.robots.enabled else 0.0
delay = max(self.config.REQUEST_DELAY, robots_delay or 0.0)
last = self._last_fetch_by_host.get(host, 0.0)
if delay and last:
to_sleep = delay - max(0.0, time.time() - last)
if to_sleep > 0:
time.sleep(to_sleep)
response = self.session.get(
url,
timeout=self.config.REQUEST_TIMEOUT,
allow_redirects=True,
stream=True,
)
self._last_fetch_by_host[host] = time.time()
# Check content type
content_type = response.headers.get('Content-Type', '').lower()
if not any(ct in content_type for ct in ['text/html', 'application/xhtml']):
print(f"⚠️ Skipping non-HTML content: {content_type}")
return None
# Check content size
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > self.config.MAX_CONTENT_SIZE:
print(f"⚠️ Skipping large content: {content_length} bytes")
return None
# For HTML, get full content
response.raise_for_status()
return response
except requests.Timeout:
print(f"⏱️ Timeout fetching {url}")
raise
except requests.RequestException as e:
print(f"❌ Request error fetching {url}: {e}")
raise
except Exception as e:
print(f"❌ Unexpected error fetching {url}: {e}")
return None
def crawl_url(self, url: str, depth: int) -> Optional[Dict]:
"""
Crawl a single URL and extract content and links.
Args:
url: URL to crawl
depth: Current crawl depth
Returns:
Dictionary with crawl results or None if failed
"""
# Check if URL should be skipped
should_skip, reason = self._should_skip_url(url)
if should_skip:
print(f"⏭️ Skipping {url}: {reason}")
return None
# Fetch URL
try:
# Robots.txt allow check
if self.robots.enabled and not self.robots.is_allowed(url):
print(f"⏭️ Disallowed by robots.txt: {url}")
return None
response = self._fetch_url(url)
if not response:
return None
# Extract final URL after redirects
final_url = self._normalize_url(response.url)
return {
'url': final_url,
'original_url': url,
'status_code': response.status_code,
'content': response.text,
'headers': dict(response.headers),
'crawl_depth': depth
}
except Exception as e:
print(f"❌ Failed to crawl {url}: {e}")
return None
def discover_links(self, base_url: str, html_content: str, current_depth: int) -> List[Tuple[str, int]]:
"""
Discover and normalize links from HTML content.
Args:
base_url: Base URL for resolving relative links
html_content: HTML content to parse
current_depth: Current crawl depth
Returns:
List of (url, depth) tuples for discovered links
"""
from bs4 import BeautifulSoup
discovered = []
try:
soup = BeautifulSoup(html_content, 'lxml')
for link in soup.find_all('a', href=True):
href = link.get('href', '').strip()
# Skip empty, javascript, mailto, tel links
if not href or href.startswith(('javascript:', 'mailto:', 'tel:', '#')):
continue
# Resolve relative URLs
absolute_url = urljoin(base_url, href)
normalized_url = self._normalize_url(absolute_url)
if not normalized_url:
continue
# Check domain restriction
if not self._is_allowed_domain(normalized_url):
continue
# Check if already seen or should skip
if normalized_url in self.seen_urls:
continue
should_skip, _ = self._should_skip_url(normalized_url)
if should_skip:
continue
# Add to discovered links
next_depth = current_depth + 1
if next_depth <= self.config.MAX_DEPTH:
discovered.append((normalized_url, next_depth))
self.seen_urls.add(normalized_url)
except Exception as e:
print(f"⚠️ Error discovering links from {base_url}: {e}")
return discovered
def run(self) -> int:
"""
Run the crawl until queue is empty or page limit is reached.
Returns:
Number of pages successfully crawled
"""
pages_crawled = 0
print(f"\n🚀 Starting crawl...")
print(f"📊 Queue size: {len(self.queue)}")
print(f"📊 Already crawled: {len(self.crawled_urls)}")
while self.queue and pages_crawled < self.config.MAX_PAGES:
url, depth = self.queue.popleft()
# Skip if already crawled
if url in self.crawled_urls:
continue
print(f"\n🔍 [{pages_crawled + 1}/{self.config.MAX_PAGES}] Crawling (depth {depth}): {url}")
# Crawl the URL
result = self.crawl_url(url, depth)
if result:
# Generate a temporary slug from URL to avoid collisions
import hashlib
url_hash = hashlib.sha256(result['url'].encode()).hexdigest()[:16]
temp_slug = f"temp-{url_hash}"
# Store in database (basic data for now)
page_data = {
'title': '', # Will be extracted by content processor
'slug': temp_slug, # Temporary slug, will be regenerated during processing
'content': result['content'],
'crawl_depth': depth,
'metadata': {
'status_code': result['status_code'],
'original_url': result['original_url']
}
}
self.db.upsert_page(result['url'], page_data)
# Discover new links
new_links = self.discover_links(result['url'], result['content'], depth)
if new_links:
print(f" 🔗 Discovered {len(new_links)} new links")
self.queue.extend(new_links)
self.crawled_urls.add(result['url'])
pages_crawled += 1
# Rate limiting
time.sleep(self.config.REQUEST_DELAY)
print(f"\n✅ Crawl completed: {pages_crawled} pages")
return pages_crawled
if __name__ == "__main__":
# Test crawler
from pathlib import Path
from config import CrawlerConfig
# Override config for testing
CrawlerConfig.SEED_URLS = ["https://example.com"]
CrawlerConfig.MAX_PAGES = 3
CrawlerConfig.MAX_DEPTH = 1
CrawlerConfig.REQUEST_DELAY = 2.0
test_db = Path("test_crawler.db")
db = CrawlerDatabase(test_db)
crawler = WebCrawler(db, CrawlerConfig)
crawler.initialize(CrawlerConfig.SEED_URLS)
pages = crawler.run()
print(f"\n📊 Crawled {pages} pages")
# Show statistics
stats = db.get_crawl_statistics()
print(f"📊 Statistics: {stats}")
# Cleanup
test_db.unlink()
print("✅ Crawler test completed")