Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies = [
"openpyxl==3.1.5",
"aiohttp==3.13.3",
"numpy==2.2.6",
"scikit-learn>=1.4",
"langdetect==1.0.9",
"textual[syntax]>=1.0.0",
"claude-agent-sdk==0.1.52",
Expand Down
81 changes: 60 additions & 21 deletions src/services/trend_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import annotations

import asyncio
import logging
from collections import Counter
from dataclasses import dataclass

from sklearn.feature_extraction.text import TfidfVectorizer

from src.database import Database

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -44,47 +48,82 @@ class PeakHour:
class TrendService:
"""Trend analysis over collected messages."""

_TOPIC_BATCH_SIZE = 5000
_MAX_TOPIC_DOCUMENTS = 10000

def __init__(self, db: Database) -> None:
self._db = db

async def get_trending_topics(self, days: int = 7, limit: int = 20) -> list[TrendingTopic]:
"""Return top keywords by frequency in messages from the last N days.
"""Return top keywords ranked by TF-IDF from recent messages.

Uses a simple word-frequency approach on the messages table so it works
even without FTS5. Short words (<4 chars) and stop-words are skipped.
Processes rows in batches to avoid loading all texts into memory at once.
Uses a bounded corpus of recent messages to avoid unbounded memory usage
and keeps raw mention counts so downstream callers can continue to render
the results as "mentions".
"""
batch_size = 5000
if limit <= 0:
return []

offset = 0
word_counts: dict[str, int] = {}
stop_words = {
"и", "в", "на", "с", "по", "не", "это", "то", "что",
"как", "из", "за", "от", "для", "или", "но", "а",
"the", "and", "is", "in", "to", "of", "a", "for",
}
while True:
texts: list[str] = []

while len(texts) < self._MAX_TOPIC_DOCUMENTS:
batch_size = min(self._TOPIC_BATCH_SIZE, self._MAX_TOPIC_DOCUMENTS - len(texts))
rows = await self._db.execute_fetchall(
"""
SELECT text FROM messages
WHERE date >= date('now', ?)
AND COALESCE(TRIM(text), '') <> ''
ORDER BY date DESC, id DESC
LIMIT ? OFFSET ?
""",
(f"-{days} days", batch_size, offset),
)
if not rows:
break
for row in rows:
text = row["text"] or ""
for word in text.split():
w = word.lower().strip(".,!?:;\"'()[]{}–—")
if len(w) >= 4 and w not in stop_words and w.isalpha():
word_counts[w] = word_counts.get(w, 0) + 1
texts.extend(row["text"] or "" for row in rows)
if len(rows) < batch_size:
break
offset += batch_size
sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
return [TrendingTopic(keyword=w, count=c) for w, c in sorted_words[:limit]]
offset += len(rows)

if not texts:
return []

if len(texts) == self._MAX_TOPIC_DOCUMENTS:
logger.info(
"Capped trending-topic corpus at %d most recent messages",
self._MAX_TOPIC_DOCUMENTS,
)

return await asyncio.to_thread(self._rank_trending_topics, texts, limit)

def _rank_trending_topics(self, texts: list[str], limit: int) -> list[TrendingTopic]:
vectorizer = TfidfVectorizer(
token_pattern=r"(?u)\b[а-яёa-z]{4,}\b",
max_df=0.85,
min_df=2,
)
Comment on lines +101 to +105
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description mentions “configurable thresholds”, but max_df=0.85, min_df=2, and the token pattern are hard-coded here. If these are intended to be configurable, consider adding parameters (with defaults) or reading from config so behavior can be tuned without code changes.

Copilot uses AI. Check for mistakes.
try:
tfidf_matrix = vectorizer.fit_transform(texts)
except ValueError:
Comment on lines +101 to +108
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TfidfVectorizer.fit_transform is CPU-bound and is executed inside an async def, which will block the event loop when called from async web routes/CLI handlers. Consider offloading the TF‑IDF computation to a worker thread (e.g., asyncio.to_thread) or moving this work to a sync context/background job so concurrent requests/tasks aren’t stalled.

Copilot uses AI. Check for mistakes.
return []

feature_names = vectorizer.get_feature_names_out()
scores = tfidf_matrix.sum(axis=0).A1
mention_counts: Counter[str] = Counter()
analyzer = vectorizer.build_analyzer()

for text in texts:
mention_counts.update(analyzer(text))

top_indices = scores.argsort()[::-1]
topics: list[TrendingTopic] = []
for index in top_indices:
keyword = feature_names[index]
topics.append(TrendingTopic(keyword=keyword, count=mention_counts[keyword]))
if len(topics) >= limit:
break
return topics

async def get_trending_channels(self, days: int = 7, limit: int = 10) -> list[TrendingChannel]:
"""Return channels with the highest average views in the last N days."""
Expand Down
105 changes: 69 additions & 36 deletions tests/test_trend_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,92 +44,105 @@ async def test_get_trending_topics_empty_db(service, mock_db):


@pytest.mark.asyncio
async def test_get_trending_topics_filters_stop_words(service, mock_db):
"""get_trending_topics filters out stop words."""
# Return messages with stop words and valid words
mock_db.execute_fetchall = AsyncMock(
return_value=[
{"text": "Это тест и проверка на стоп слова"},
{"text": "Важное сообщение для теста"},
]
)
async def test_get_trending_topics_filters_high_frequency(service, mock_db):
"""get_trending_topics filters out words appearing in >85% of messages via max_df."""
# "hello" appears in all 10 messages → 100% > max_df 85% → filtered
# "topic" appears in 3/10 = 30% → kept
messages = [{"text": f"hello world testing {i} topic"} for i in range(7)]
messages.extend([{"text": f"hello world testing {i}"} for i in range(7, 10)])
mock_db.execute_fetchall = AsyncMock(return_value=messages)

result = await service.get_trending_topics(days=7, limit=20)
keywords = [t.keyword for t in result]

# Stop words should be filtered out
words = [t.keyword for t in result]
assert "и" not in words
assert "на" not in words
assert "для" not in words
assert "hello" not in keywords # 100% documents → filtered by max_df
assert "topic" in keywords # 30% documents → kept


@pytest.mark.asyncio
async def test_get_trending_topics_respects_limit(service, mock_db):
"""get_trending_topics respects the limit parameter."""
# Create many messages with unique words
mock_db.execute_fetchall = AsyncMock(
return_value=[
{"text": f"word{i} test content"} for i in range(100)
]
)

result = await service.get_trending_topics(days=7, limit=10)
keywords = [
"alpha",
"bravo",
"charlie",
"delta",
"echo",
"foxtrot",
"golf",
"hotel",
"india",
"juliet",
]
messages = []
for keyword in keywords:
messages.append({"text": f"{keyword} commonword"})
messages.append({"text": f"{keyword} commonword"})
mock_db.execute_fetchall = AsyncMock(return_value=messages)

result = await service.get_trending_topics(days=7, limit=5)

assert len(result) <= 10
assert len(result) == 5
assert {topic.keyword for topic in result}.issubset(set(keywords))


@pytest.mark.asyncio
async def test_get_trending_topics_short_words_filtered(service, mock_db):
"""get_trending_topics filters words shorter than 4 characters."""
mock_db.execute_fetchall = AsyncMock(
return_value=[
{"text": "abc test important verify"},
{"text": "abc verify signal"},
{"text": "abc verify insight"},
{"text": "signal topic insight"},
]
)

result = await service.get_trending_topics(days=7, limit=20)

# "abc" is only 3 chars and should be filtered
words = [t.keyword for t in result]
assert "abc" not in words
assert "verify" in words


@pytest.mark.asyncio
async def test_get_trending_topics_with_data(service, mock_db):
"""get_trending_topics processes and returns topics correctly."""
mock_db.execute_fetchall = AsyncMock(
return_value=[
{"text": "Important topic here", "views": 100},
{"text": "Another important topic here", "views": 50},
{"text": "важная важная тема здесь"},
{"text": "другая важная тема"},
{"text": "сегодня тема обзор"},
]
)

result = await service.get_trending_topics(days=7, limit=10)

assert all(isinstance(t, TrendingTopic) for t in result)
# Check that "important" appears with count=2 (in both messages)
important_topic = next((t for t in result if t.keyword == "important"), None)
assert important_topic is not None
assert important_topic.count == 2 # "important" appears twice
keywords = [t.keyword for t in result]
assert "важная" in keywords
important = next((t for t in result if t.keyword == "важная"), None)
assert important is not None
assert important.count == 3


@pytest.mark.asyncio
async def test_get_trending_topics_non_alpha_only(service, mock_db):
"""get_trending_topics only includes alphabetic words."""
"""get_trending_topics only includes alphabetic words (4+ chars, RU/EN)."""
mock_db.execute_fetchall = AsyncMock(
return_value=[
{"text": "validword 123numeric"},
{"text": "test123"},
{"text": "keepword 123numeric"},
{"text": "keepword test123"},
{"text": "другоеслово 123numeric"},
{"text": "иноеслово test123"},
]
)

result = await service.get_trending_topics(days=7, limit=10)

# Non-alpha words like "123numeric" should be filtered
words = [t.keyword for t in result]
assert "123numeric" not in words
assert "test123" not in words
assert "validword" in words
assert "keepword" in words


@pytest.mark.asyncio
Expand Down Expand Up @@ -365,3 +378,23 @@ def test_peak_hour_dataclass():

assert peak.hour == 12
assert peak.count == 5


# === Regression tests ===


@pytest.mark.asyncio
async def test_get_trending_topics_tfidf_suppresses_noise(service, mock_db):
"""Regression #329: words in every message should not appear in trends."""
# "hello" appears in all 10 messages → max_df filters it out
# "криптовалюта" appears in 2/10 → high IDF → should be in top
messages = [{"text": f"hello world testing {i}"} for i in range(10)]
messages[0]["text"] += " криптовалюта"
messages[1]["text"] += " криптовалюта"
mock_db.execute_fetchall = AsyncMock(return_value=messages)

result = await service.get_trending_topics(days=7, limit=20)
keywords = [t.keyword for t in result]

assert "криптовалюта" in keywords
assert "hello" not in keywords # in 100% documents → filtered by max_df
Loading