-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathbackground_processing.py
More file actions
182 lines (147 loc) · 5.93 KB
/
background_processing.py
File metadata and controls
182 lines (147 loc) · 5.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from datetime import datetime, timedelta
import random
import pandas as pd
import requests
import yaml
from utils.logger import logger
from utils.mongo_handler import MongoDBHandler, requestItemProcessing
# Load config
def load_config(config_path: str):
with open(config_path, 'r') as file:
return yaml.safe_load(file)
config = load_config('config.yaml')
algo_version = config['version']['algo_version']
mongo_handler = MongoDBHandler()
def fetch_qid_by_label(label):
"""
Fetch QID for a given label using SPARQL.
"""
url = "https://query.wikidata.org/sparql"
query = f"""
SELECT ?item WHERE {{
?item rdfs:label "{label}"@en.
}}
"""
headers = {
"User-Agent": "ProVe/1.1.0 (jongmo.kim@kcl.ac.uk)",
"Accept": "application/sparql-results+json"
}
response = requests.get(url, params={"query": query}, headers=headers)
if response.status_code == 200:
data = response.json()
results = data.get("results", {}).get("bindings", [])
if results:
return results[0]["item"]["value"].split("/")[-1] # Extract QID from URI
return None # No QID found
else:
logger.error(f"Error fetching QID for label {label}: {response.text}")
return None
def fetch_top_pageviews_and_qid(project, access, year, month, day, limit=10):
"""
Fetches the top viewed pages and their QIDs.
Args:
project: The Wikipedia project (e.g., "en.wikipedia").
access: The access method (e.g., "all-access").
year: The year of the data.
month: The month of the data.
day: The day of the data.
limit: The maximum number of articles to return.
Returns:
List of tuples containing (title, views, QID).
"""
url = f"https://wikimedia.org/api/rest_v1/metrics/pageviews/top/{project}/{access}/{year}/{month}/{day}"
headers = {
"User-Agent": "ProVe/1.1.0 (jongmo.kim@kcl.ac.uk)"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
articles = data.get('items', [])[0].get('articles', [])
top_articles = []
for article in articles: # Iterate through all articles
title = article['article'].replace("_", " ") # Replace underscores with spaces
views = article['views']
qid = fetch_qid_by_label(title) # Use the correct function to fetch QID
# Exclude specific titles
if title in ["Main Page", "Special:Search"]:
logger.info(f"Excluding title: {title}")
continue
# Debugging output
if qid is None:
logger.info(f"QID not found for title: {title}")
top_articles.append((title, views, qid))
# Stop if we have reached the desired limit
if len(top_articles) >= limit:
break
return top_articles
else:
logger.error(f"Error fetching top pageviews: {response.text}")
return None
def process_top_viewed_items(project="en.wikipedia", access="all-access", limit=5):
"""
Process the top viewed items from yesterday and queue them for processing.
Args:
project: The Wikipedia project (e.g., "en.wikipedia").
access: The access method (e.g., "all-access").
limit: The maximum number of articles to return.
"""
# Get yesterday's date
yesterday = datetime.utcnow() - timedelta(days=1)
year = yesterday.strftime("%Y")
month = yesterday.strftime("%m")
day = yesterday.strftime("%d")
# Fetch top viewed items
top_items = fetch_top_pageviews_and_qid(project, access, year, month, day, limit)
if top_items:
logger.info("Top viewed items from yesterday:")
for idx, (title, views, qid) in enumerate(top_items, 1):
logger.info(f"{idx}. Title: {title} - {views} views (QID: {qid})")
# Queue each item for processing
if qid: # Only queue if QID is found
result = requestItemProcessing(qid, 'top_viewed')
logger.info(f" Queue status: {result}")
else:
logger.info("No articles found.")
def process_pagepile_list(file_path='utils/pagepileList.txt'):
"""
Process the QIDs from the pagepile list file and queue them for processing.
Args:
file_path: The path to the pagepile list file.
"""
try:
with open(file_path, 'r') as file:
qids = file.read().splitlines()
for qid in qids:
if qid: # Ensure the QID is not empty
result = requestItemProcessing(qid, 'pagepile_weekly_update')
logger.info(f"Queued QID {qid} for processing: {result}")
except Exception as e:
logger.error(f"Error processing pagepile list: {e}")
def process_system_qid(qid: str) -> None:
"""
Queue system QID for processing.
Args:
qid: The QID to process.
Raises:
ValueError: If the QID does not start with 'Q'.
"""
if not qid.startswith('Q'):
try:
int(qid) # Check if the random QID is a valid integer
qid = f"Q{qid}"
except ValueError as e:
raise ValueError("Generated QID does not start with 'Q'.") from e
# Queue the random QID for processing
result = requestItemProcessing(
qid=qid,
algo_version=algo_version,
request_type='Random_processing',
queue=mongo_handler.random_collection,
save_function=mongo_handler.random_collection.insert_one
)
logger.info(f"Queued random QID {qid} for processing: {result}")
if __name__ == "__main__":
# Uncomment the desired function to run
# process_top_viewed_items(limit=300) # Process top viewed items
process_pagepile_list() # Process QIDs from pagepile list
#process_random_qid() # Process a random QID