Skip to content
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Django
SECRET_KEY=#
DEBUG=#

# redis
CELERY_BROKER_IP=#
REDIS_HOST=#

# PostgreSQL
DB_NAME=postgres
Expand Down
2 changes: 1 addition & 1 deletion core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@

from .celery import app as celery_app

__all__ = ["celery_app"]
__all__ = ("celery_app",)
10 changes: 8 additions & 2 deletions core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
from pathlib import Path

from celery.schedules import timedelta
from dotenv import load_dotenv

load_dotenv()
Expand All @@ -30,6 +31,7 @@
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = os.environ.get("DEBUG") == "True"


ALLOWED_HOSTS = (
[
os.environ.get("DOMAIN", "*"),
Expand Down Expand Up @@ -152,13 +154,17 @@


# Celery settings

CELERY_BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_BEAT_SCHEDULE = {
"send_orders_task": {
"task": "tracker.tasks.check_for_new_issues",
"schedule": timedelta(seconds=10),
}
}

# Custom app settings

DEFAULT_SCHEDULE_INTERVAL = 3600
18 changes: 18 additions & 0 deletions tracker/migrations/0002_telegramuser_is_allowed_notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.1.3 on 2024-11-21 19:32

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("tracker", "0001_initial"),
]

operations = [
migrations.AddField(
model_name="telegramuser",
name="is_allowed_notification",
field=models.BooleanField(default=False),
),
]
1 change: 1 addition & 0 deletions tracker/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class TelegramUser(AbstractModel):

user = models.OneToOneField(CustomUser, on_delete=models.CASCADE)
telegram_id = models.CharField(unique=True)
is_allowed_notification = models.BooleanField(default=False)

def __str__(self) -> str:
"""
Expand Down
68 changes: 65 additions & 3 deletions tracker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,74 @@
"""
A `tracker.tasks` module that contains all celery tasks.
"""

import json
import logging
import os

import redis
from asgiref.sync import async_to_sync
from celery import shared_task
from dotenv import load_dotenv

from tracker.models import Repository, TelegramUser
from tracker.telegram.bot import send_new_issue_notification, send_revision_messages
from tracker.utils import (
compare_two_repo_dicts,
get_existing_issues_for_subscribed_users,
get_user_revisions,
)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


load_dotenv()
# Connect to Redis


@shared_task()
def get_relevant_recipients() -> None:
"""
Retrieves a mapping of Telegram users subscribed for
new-issue-notifications to the repositories they are subscribed to.

:return: A dictionary where keys are Telegram user IDs, and values are lists of subscribed repository names.
"""
subscribed_users = (
TelegramUser.objects.filter(notify_about_new_issues=True).first().user
)
repositories = Repository.objects.filter(user__in=subscribed_users).values(
"author", "name"
)
existing_issues = get_existing_issues_for_subscribed_users(repositories)

cache = redis.Redis(
host=os.environ.get("REDIS_HOST"), port=6379, decode_responses=True
)
if not cache.exists("task_first_run_flag"):
cache.set("existing:issues", json.dumps(existing_issues))
return

cached_existing_issues = cache.get("existing:issues")
cached_existing_issues = json.loads(str(cached_existing_issues))
new_issues = compare_two_repo_dicts(existing_issues, cached_existing_issues)
repos_with_new_issues = [key for key in new_issues]

user_repo_map = {}
for telegram_user in subscribed_users:
repos = Repository.objects.filter(
user=telegram_user.user, name__in=repos_with_new_issues
)

logger.info(f"Telegram User: {telegram_user.telegram_id}")
for repo in repos:
if telegram_user.telegram_id not in user_repo_map:
user_repo_map[telegram_user.telegram_id] = []

user_repo_map[telegram_user.telegram_id].append(repo.name)

from .models import TelegramUser
from .telegram.bot import send_revision_messages
from .utils import get_user_revisions
async_to_sync(send_new_issue_notification)(user_repo_map, new_issues)


@shared_task
Expand Down
51 changes: 45 additions & 6 deletions tracker/telegram/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@

from aiogram import Bot, Dispatcher, F
from aiogram.client.default import DefaultBotProperties
from aiogram.filters import CommandObject, CommandStart
from aiogram.filters import Command, CommandObject, CommandStart
from aiogram.types.message import Message
from aiogram.utils.deep_linking import create_start_link
from aiogram.utils.keyboard import ReplyKeyboardBuilder, ReplyKeyboardMarkup
from dotenv import load_dotenv

from tracker import ISSUES_URL, PULLS_URL, get_issues_without_pull_requests
from tracker.models import TelegramUser
from tracker.telegram.templates import TEMPLATES
from tracker.utils import (
attach_link_to_issue,
create_telegram_user,
get_all_available_issues,
get_all_repostitories,
get_all_repositories,
get_contributor_issues,
get_repository_support,
get_support_link,
Expand Down Expand Up @@ -80,14 +81,40 @@ async def start_message(message: Message) -> None:
)


@dp.message(Command("notify_about_new_issues"))
async def subscribe_to_issue_notifications(msg: Message):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docstring

"""

Updates telegram user subscription status, and responds with the new subscription status.

:param msg: Message instance used to retrieve telegram id.
:return: None
"""
try:
telegram_user = TelegramUser.objects.filter(
telegram_id=msg.from_user.id
).first()
if not telegram_user:
await msg.answer(f"Telegram user with ID {msg.from_user.id} not found.")
return

telegram_user.is_subscribed = not telegram_user.is_subscribed
telegram_user.save(update_fields=["is_subscribed"])
status = "subscribed" if telegram_user.is_subscribed else "unsubscribed"
await msg.answer(f'Your status was successfully changed to "{status}"')

except Exception as e:
logger.info(f"During the execution, unexpected error occurred: {e}")


@dp.message(F.text == "📓get missed deadlines📓")
async def send_deprecated_issue_assignees(msg: Message) -> None:
"""
Sends information about assignees that missed the deadline.
:param msg: Message instance for communication with a user
:return: None
"""
all_repositories = await get_all_repostitories(msg.from_user.id)
all_repositories = await get_all_repositories(msg.from_user.id)

for repository in all_repositories:

Expand Down Expand Up @@ -130,7 +157,7 @@ async def send_available_issues(msg: Message) -> None:
:param msg: Message instance for communication with a user
:return: None
"""
all_repositories = await get_all_repostitories(msg.from_user.id)
all_repositories = await get_all_repositories(msg.from_user.id)

for repository in all_repositories:
repo_message = TEMPLATES.repo_header.substitute(
Expand All @@ -156,7 +183,19 @@ async def send_available_issues(msg: Message) -> None:

message = repo_message + issue_messages

await msg.reply(message, parse_mode="HTML")
await msg.reply(message)


async def send_new_issue_notification(
id_to_repos_map: dict[str, list], repo_to_issues_map: dict[str, list]
):
for tg_id, repos in id_to_repos_map.values():
for repo in repos:
message = f"There are new issues in {repo}!\n"
repo_issues = repo_to_issues_map[repo]
for issue in repo_issues:
message += f"<blockquote>{issue}</blockquote>"
await bot.send_message(tg_id, message)


@dp.message(F.text.contains("/issues "))
Expand Down Expand Up @@ -215,7 +254,7 @@ async def send_support_contacts(msg: Message) -> None:
:param msg: Message instance for communication with a user
:return: None
"""
all_repositories = await get_all_repostitories(msg.from_user.id)
all_repositories = await get_all_repositories(msg.from_user.id)

for repository in all_repositories:
repo_message = TEMPLATES.repo_header.substitute(
Expand Down
72 changes: 68 additions & 4 deletions tracker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DATETIME_FORMAT,
HEADERS,
ISSUES_SEARCH,
ISSUES_URL,
PULLS_REVIEWS_URL,
PULLS_URL,
SECONDS_IN_AN_HOUR,
Expand All @@ -38,7 +39,7 @@ def escape_html(text: str) -> str:


@sync_to_async
def get_all_repostitories(tele_id: str) -> list[dict]:
def get_all_repositories(tele_id: str) -> list[dict]:
"""
A function that returns a list of repositories asyncronously.
:param tele_id: str
Expand Down Expand Up @@ -272,9 +273,8 @@ def get_pull_reviews(url: str) -> list[dict]:
try:
response = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
return response.json()

if response.ok:
return response.json()
except requests.exceptions.RequestException as e:
logger.info(e)
return []
Expand All @@ -286,7 +286,7 @@ def get_user_revisions(telegram_id: str) -> list[dict]:
:params tele_id: The TelegramUser id of the user
:return: A list of reviews for all the user repos open PRS
"""
repos = async_to_sync(get_all_repostitories)(telegram_id)
repos = async_to_sync(get_all_repositories)(telegram_id)
reviews_list = []
for repo in repos:
pulls = get_all_open_pull_requests(
Expand All @@ -302,6 +302,7 @@ def get_user_revisions(telegram_id: str) -> list[dict]:
pull_number=pull["number"],
)
)

if reviews_data:
return_data["reviews"] = reviews_data
reviews_list.append(return_data.copy())
Expand Down Expand Up @@ -342,6 +343,69 @@ def get_contributor_issues(
return []


def get_all_opened_issues(url: str) -> list[dict]:
"""
Retrieves all opened issues from the given URL.
:param url: An API endpoint for issues.
:return: A list of dictionaries representing opened issues.
"""
try:
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
issues = response.json()

opened_issues = list(
filter(
lambda issue: issue.get("state") == "open"
and not issue.get("draft")
and not issue.get("pull_request"),
issues,
)
)

return opened_issues
except requests.exceptions.RequestException as e:
logger.info(e)
return []


def get_existing_issues_for_subscribed_users(
repositories: list[dict],
) -> dict[str, list[str]]:
"""
Retrieves open issues for a given list of repositories.

:param repositories: List of repositories with "author" and "name".
:return: Dictionary with repository names as keys and lists of open issue titles as values.
"""

repository_data = {}
for repository in repositories:
issues = get_all_opened_issues(
ISSUES_URL.format(
owner=repository.get("author", str()),
repo=repository.get("name", str()),
)
)
repository_data[repository.get("name", str())] = [
issue.get("title") for issue in issues
]
return repository_data


def compare_two_repo_dicts(
dict1: dict[str, list[str]], dict2: dict[str, list[str]]
) -> dict[str, list[str]]:
diff = {}
for key in dict1:
len_1 = len(dict1[key])
len_2 = len(dict2[key])
if len_1 > len_2:
new_issues = len_1 - len_2
diff.update({key: dict1[key][:new_issues]})
return diff


def attach_link_to_issue(issue: dict) -> str:
"""
Attaches the issue link to the issue title
Expand Down
Loading