diff --git a/migrations/versions/a9c4bf6370de_soft_deletes.py b/migrations/versions/a9c4bf6370de_soft_deletes.py new file mode 100644 index 0000000..0d1a07c --- /dev/null +++ b/migrations/versions/a9c4bf6370de_soft_deletes.py @@ -0,0 +1,45 @@ +"""soft-deletes + +Revision ID: a9c4bf6370de +Revises: a68c6bb2972c +Create Date: 2025-10-11 01:33:01.717991 + +""" + +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision = 'a9c4bf6370de' +down_revision = 'a68c6bb2972c' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column('file', sa.Column('is_deleted', sa.Boolean(), server_default='false', nullable=False)) + op.alter_column('file', 'option_pages', existing_type=sa.VARCHAR(), nullable=False) + op.alter_column('file', 'option_copies', existing_type=sa.INTEGER(), nullable=False) + op.alter_column('file', 'option_two_sided', existing_type=sa.BOOLEAN(), nullable=False) + op.alter_column('file', 'number_of_pages', existing_type=sa.INTEGER(), nullable=False) + op.alter_column('file', 'source', existing_type=sa.VARCHAR(), nullable=False) + op.add_column( + 'print_fact', sa.Column('is_deleted', sa.Boolean(), server_default='false', nullable=False) + ) + op.alter_column('print_fact', 'sheets_used', existing_type=sa.INTEGER(), nullable=False) + op.add_column( + 'union_member', sa.Column('is_deleted', sa.Boolean(), server_default='false', nullable=False) + ) + + +def downgrade(): + op.drop_column('union_member', 'is_deleted') + op.alter_column('print_fact', 'sheets_used', existing_type=sa.INTEGER(), nullable=True) + op.drop_column('print_fact', 'is_deleted') + op.alter_column('file', 'source', existing_type=sa.VARCHAR(), nullable=True) + op.alter_column('file', 'number_of_pages', existing_type=sa.INTEGER(), nullable=True) + op.alter_column('file', 'option_two_sided', existing_type=sa.BOOLEAN(), nullable=True) + op.alter_column('file', 'option_copies', existing_type=sa.INTEGER(), nullable=True) + op.alter_column('file', 'option_pages', existing_type=sa.VARCHAR(), nullable=True) + op.drop_column('file', 'is_deleted') diff --git a/print_service/base.py b/print_service/base.py index e203da4..27d2247 100644 --- a/print_service/base.py +++ b/print_service/base.py @@ -1,17 +1,70 @@ -from pydantic import BaseModel, ConfigDict +from __future__ import annotations +import re -class Base(BaseModel): - def __repr__(self) -> str: +from sqlalchemy import not_ +from sqlalchemy.exc import NoResultFound +from sqlalchemy.orm import Mapped, Query, Session, as_declarative, declared_attr + +from print_service.exceptions import ObjectNotFound + + +@as_declarative() +class Base: + @declared_attr + def __tablename__(cls) -> str: # pylint: disable=no-self-argument + return re.sub(r"(? BaseDbModel: + obj = cls(**kwargs) + session.add(obj) + session.flush() + return obj + + @classmethod + def query(cls, *, with_deleted: bool = False, session: Session) -> Query: + objs = session.query(cls) + if not with_deleted and hasattr(cls, "is_deleted"): + objs = objs.filter(not_(cls.is_deleted)) + return objs + + @classmethod + def get(cls, id: int | str, *, with_deleted=False, session: Session) -> BaseDbModel: + objs = session.query(cls) + if not with_deleted and hasattr(cls, "is_deleted"): + objs = objs.filter(not_(cls.is_deleted)) + try: + if hasattr(cls, "uuid"): + return objs.filter(cls.uuid == id).one() + return objs.filter(cls.id == id).one() + except NoResultFound: + raise ObjectNotFound(cls, id) - model_config = ConfigDict(from_attributes=True, extra="ignore") + @classmethod + def update(cls, id: int | str, *, session: Session, **kwargs) -> BaseDbModel: + obj = cls.get(id, session=session) + for k, v in kwargs.items(): + setattr(obj, k, v) + session.flush() + return obj -class StatusResponseModel(Base): - status: str - message: str - ru: str + @classmethod + def delete(cls, id: int | str, *, session: Session) -> None: + obj = cls.get(id, session=session) + if hasattr(obj, "is_deleted"): + obj.is_deleted = True + else: + session.delete(obj) + session.flush() diff --git a/print_service/exceptions.py b/print_service/exceptions.py index 99ed16a..cb573ce 100644 --- a/print_service/exceptions.py +++ b/print_service/exceptions.py @@ -1,33 +1,61 @@ +from typing import Type + from print_service.settings import get_settings settings = get_settings() -class ObjectNotFound(Exception): - pass +class PrintAPIError(Exception): + eng: str + ru: str + + def __init__(self, eng: str, ru: str) -> None: + self.eng = eng + self.ru = ru + super().__init__(eng) + + +class ObjectNotFound(PrintAPIError): + def __init__(self, obj: type, obj_id_or_name: int | str): + super().__init__( + f"Object {obj.__name__} {obj_id_or_name=} not found", + f"Объект {obj.__name__} с идентификатором {obj_id_or_name} не найден", + ) + + +class AlreadyExists(PrintAPIError): + def __init__(self, obj: type, obj_id_or_name: int | str): + super().__init__( + f"Object {obj.__name__}, {obj_id_or_name=} already exists", + f"Объект {obj.__name__} с идентификатором {obj_id_or_name=} уже существует", + ) class TerminalTokenNotFound(ObjectNotFound): - pass + def __init__(self, token_id: int | str): + super().__init__(type(self), token_id) class TerminalQRNotFound(ObjectNotFound): - pass + def __init__(self, qr_id: int | str): + super().__init__(type(self), qr_id) class PINNotFound(ObjectNotFound): def __init__(self, pin: str): self.pin = pin + super().__init__(type(self), pin) class UserNotFound(ObjectNotFound): - pass + def __init__(self, user_id: int | str): + super().__init__(type(self), user_id) class FileNotFound(ObjectNotFound): - def __init__(self, count: int): - self.count = count + def __init__(self, file_id: int | str): + super().__init__(type(self), file_id) class TooManyPages(Exception): diff --git a/print_service/models/__init__.py b/print_service/models/__init__.py index d1ffcfe..0308c17 100644 --- a/print_service/models/__init__.py +++ b/print_service/models/__init__.py @@ -3,54 +3,55 @@ import math from datetime import datetime -from sqlalchemy import Column, DateTime, Integer, String -from sqlalchemy.ext.declarative import as_declarative +from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String, func from sqlalchemy.orm import Mapped, mapped_column, relationship -from sqlalchemy.sql.schema import ForeignKey -from sqlalchemy.sql.sqltypes import Boolean +from print_service.base import Base, BaseDbModel -@as_declarative() -class Model: - pass +Model = Base -class UnionMember(Model): + +class UnionMember(BaseDbModel): __tablename__ = 'union_member' id: Mapped[int] = mapped_column(Integer, primary_key=True) surname: Mapped[str] = mapped_column(String, nullable=False) union_number: Mapped[str] = mapped_column(String, nullable=True) student_number: Mapped[str] = mapped_column(String, nullable=True) + is_deleted: Mapped[bool] = mapped_column( + Boolean, nullable=False, default=False, server_default="false" + ) files: Mapped[list[File]] = relationship('File', back_populates='owner') print_facts: Mapped[list[PrintFact]] = relationship('PrintFact', back_populates='owner') -class File(Model): +class File(BaseDbModel): __tablename__ = 'file' - id: Mapped[int] = Column(Integer, primary_key=True) - pin: Mapped[str] = Column(String, nullable=False) - file: Mapped[str] = Column(String, nullable=False) - owner_id: Mapped[int] = Column(Integer, ForeignKey('union_member.id'), nullable=False) - option_pages: Mapped[str] = Column(String) - option_copies: Mapped[int] = Column(Integer) - option_two_sided: Mapped[bool] = Column(Boolean) - created_at: Mapped[datetime] = Column(DateTime, nullable=False, default=datetime.utcnow) - updated_at: Mapped[datetime] = Column( - DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow + id: Mapped[int] = mapped_column(Integer, primary_key=True) + pin: Mapped[str] = mapped_column(String, nullable=False) + file: Mapped[str] = mapped_column(String, nullable=False) + owner_id: Mapped[int] = mapped_column(Integer, ForeignKey('union_member.id'), nullable=False) + option_pages: Mapped[str] = mapped_column(String) + option_copies: Mapped[int] = mapped_column(Integer) + option_two_sided: Mapped[bool] = mapped_column(Boolean) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.now()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.now(), onupdate=func.now() + ) + number_of_pages: Mapped[int] = mapped_column(Integer) + source: Mapped[str] = mapped_column(String, default='unknown', nullable=False) + is_deleted: Mapped[bool] = mapped_column( + Boolean, nullable=False, default=False, server_default="false" ) - number_of_pages: Mapped[int] = Column(Integer) - source: Mapped[str] = Column(String, default='unknown', nullable=False) owner: Mapped[UnionMember] = relationship('UnionMember', back_populates='files') print_facts: Mapped[list[PrintFact]] = relationship('PrintFact', back_populates='file') @property def flatten_pages(self) -> list[int] | None: - '''Возвращает расширенный список из элементов списков внутренних целочисленных точек переданного множества отрезков - "1-5, 3, 2" --> [1, 2, 3, 4, 5, 3, 2]''' if self.number_of_pages is None: return None result = list() @@ -63,9 +64,6 @@ def flatten_pages(self) -> list[int] | None: @property def sheets_count(self) -> int | None: - '''Возвращает количество элементов списков внутренних целочисленных точек переданного множества отрезков - "1-5, 3, 2" --> 7 - P.S. 1, 2, 3, 4, 5, 3, 2 -- 7 чисел''' if self.number_of_pages is None: return None if not self.flatten_pages: @@ -79,14 +77,17 @@ def sheets_count(self) -> int | None: return len(self.flatten_pages) * self.option_copies -class PrintFact(Model): +class PrintFact(BaseDbModel): __tablename__ = 'print_fact' - id: Mapped[int] = Column(Integer, primary_key=True) - file_id: Mapped[int] = Column(Integer, ForeignKey('file.id'), nullable=False) - owner_id: Mapped[int] = Column(Integer, ForeignKey('union_member.id'), nullable=False) - created_at: Mapped[datetime] = Column(DateTime, nullable=False, default=datetime.utcnow) + id: Mapped[int] = mapped_column(Integer, primary_key=True) + file_id: Mapped[int] = mapped_column(Integer, ForeignKey('file.id'), nullable=False) + owner_id: Mapped[int] = mapped_column(Integer, ForeignKey('union_member.id'), nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.now()) + sheets_used: Mapped[int] = mapped_column(Integer) + is_deleted: Mapped[bool] = mapped_column( + Boolean, nullable=False, default=False, server_default="false" + ) owner: Mapped[UnionMember] = relationship('UnionMember', back_populates='print_facts') file: Mapped[File] = relationship('File', back_populates='print_facts') - sheets_used: Mapped[int] = Column(Integer) diff --git a/print_service/routes/admin.py b/print_service/routes/admin.py index da2b7b1..090d349 100644 --- a/print_service/routes/admin.py +++ b/print_service/routes/admin.py @@ -1,11 +1,15 @@ import json import logging +from typing import List, Optional from auth_lib.fastapi import UnionAuth from fastapi import APIRouter, Depends, HTTPException +from fastapi_sqlalchemy import db from redis import Redis from print_service.exceptions import TerminalTokenNotFound +from print_service.models import File as FileModel +from print_service.models import PrintFact, UnionMember from print_service.schema import BaseModel from print_service.settings import Settings, get_settings @@ -23,6 +27,30 @@ class RebootInput(BaseModel): terminal_token: str +class UnionMemberResponse(BaseModel): + id: int + surname: str + union_number: Optional[str] + student_number: Optional[str] + is_deleted: bool + + +class FileResponse(BaseModel): + id: int + pin: str + file: str + owner_id: int + is_deleted: bool + + +class PrintFactResponse(BaseModel): + id: int + file_id: int + owner_id: int + sheets_used: Optional[int] + is_deleted: bool + + class InstantCommandSender: def __init__(self, settings: Settings = None) -> None: settings = settings or get_settings() @@ -67,3 +95,130 @@ async def reboot_terminal( return {'status': 'ok'} sender.redis.close() raise TerminalTokenNotFound() + + +# Soft Delete Management Endpoints + + +@router.get("/users", response_model=List[UnionMemberResponse]) +async def get_all_users( + include_deleted: bool = False, user=Depends(UnionAuth(scopes=["print.admin.users.read"])) +): + logger.info(f"User {user} requested all users") + users = UnionMember.query(session=db.session, with_deleted=include_deleted).all() + return [ + UnionMemberResponse( + id=u.id, + surname=u.surname, + union_number=u.union_number, + student_number=u.student_number, + is_deleted=u.is_deleted, + ) + for u in users + ] + + +@router.delete("/users/{user_id}") +async def delete_user(user_id: int, user=Depends(UnionAuth(scopes=["print.admin.users.delete"]))): + logger.info(f"User {user} deleted user {user_id}") + UnionMember.delete(user_id, session=db.session) + db.session.commit() + return {'status': 'ok', 'message': f'User {user_id} soft deleted'} + + +@router.post("/users/{user_id}/restore") +async def restore_user(user_id: int, user=Depends(UnionAuth(scopes=["print.admin.users.restore"]))): + logger.info(f"User {user} restored user {user_id}") + db_user = UnionMember.get(user_id, session=db.session, with_deleted=True) + if not db_user.is_deleted: + raise HTTPException(status_code=400, detail="User is not deleted") + + db_user.is_deleted = False + db.session.commit() + return {'status': 'ok', 'message': f'User {user_id} restored'} + + +@router.get("/files", response_model=List[FileResponse]) +async def get_all_files( + include_deleted: bool = False, user=Depends(UnionAuth(scopes=["print.admin.files.read"])) +): + """Получить список всех файлов (включая удаленных, если указано)""" + logger.info(f"User {user} requested all files") + files = FileModel.query(session=db.session, with_deleted=include_deleted).all() + return [ + FileResponse( + id=f.id, + pin=f.pin, + file=f.file, + owner_id=f.owner_id, + is_deleted=f.is_deleted, + ) + for f in files + ] + + +@router.delete("/files/{file_id}") +async def delete_file(file_id: int, user=Depends(UnionAuth(scopes=["print.admin.files.delete"]))): + """Мягкое удаление файла""" + logger.info(f"User {user} deleted file {file_id}") + FileModel.delete(file_id, session=db.session) + db.session.commit() + return {'status': 'ok', 'message': f'File {file_id} soft deleted'} + + +@router.post("/files/{file_id}/restore") +async def restore_file(file_id: int, user=Depends(UnionAuth(scopes=["print.admin.files.restore"]))): + """Восстановление мягко удаленного файла""" + logger.info(f"User {user} restored file {file_id}") + db_file = FileModel.get(file_id, session=db.session, with_deleted=True) + if not db_file.is_deleted: + raise HTTPException(status_code=400, detail="File is not deleted") + + db_file.is_deleted = False + db.session.commit() + return {'status': 'ok', 'message': f'File {file_id} restored'} + + +@router.get("/print-facts", response_model=List[PrintFactResponse]) +async def get_all_print_facts( + include_deleted: bool = False, user=Depends(UnionAuth(scopes=["print.admin.print_facts.read"])) +): + """Получить список всех фактов печати (включая удаленных, если указано)""" + logger.info(f"User {user} requested all print facts") + facts = PrintFact.query(session=db.session, with_deleted=include_deleted).all() + return [ + PrintFactResponse( + id=pf.id, + file_id=pf.file_id, + owner_id=pf.owner_id, + sheets_used=pf.sheets_used, + is_deleted=pf.is_deleted, + ) + for pf in facts + ] + + +@router.delete("/print-facts/{print_fact_id}") +async def delete_print_fact( + print_fact_id: int, user=Depends(UnionAuth(scopes=["print.admin.print_facts.delete"])) +): + """Мягкое удаление факта печати""" + logger.info(f"User {user} deleted print fact {print_fact_id}") + PrintFact.delete(print_fact_id, session=db.session) + db.session.commit() + return {'status': 'ok', 'message': f'Print fact {print_fact_id} soft deleted'} + + +@router.post("/print-facts/{print_fact_id}/restore") +async def restore_print_fact( + print_fact_id: int, user=Depends(UnionAuth(scopes=["print.admin.print_facts.restore"])) +): + """Восстановление мягко удаленного факта печати""" + logger.info(f"User {user} restored print fact {print_fact_id}") + db_fact = PrintFact.get(print_fact_id, session=db.session, with_deleted=True) + if not db_fact.is_deleted: + raise HTTPException(status_code=400, detail="Print fact is not deleted") + + db_fact.is_deleted = False + db.session.commit() + return {'status': 'ok', 'message': f'Print fact {print_fact_id} restored'} diff --git a/print_service/routes/exc_handlers.py b/print_service/routes/exc_handlers.py index ecb8aeb..7a5c1ce 100644 --- a/print_service/routes/exc_handlers.py +++ b/print_service/routes/exc_handlers.py @@ -2,7 +2,6 @@ import starlette.requests from starlette.responses import JSONResponse -from print_service.base import StatusResponseModel from print_service.exceptions import ( AlreadyUploaded, FileIsNotReceived, @@ -23,6 +22,7 @@ UserNotFound, ) from print_service.routes.base import app +from print_service.schema import StatusResponseModel from print_service.settings import get_settings diff --git a/print_service/routes/file.py b/print_service/routes/file.py index 21e3570..90cd56c 100644 --- a/print_service/routes/file.py +++ b/print_service/routes/file.py @@ -12,7 +12,6 @@ from pydantic import Field, field_validator from sqlalchemy import func, or_ -from print_service.base import StatusResponseModel from print_service.exceptions import ( AlreadyUploaded, FileIsNotReceived, @@ -29,7 +28,7 @@ ) from print_service.models import File as FileModel from print_service.models import UnionMember -from print_service.schema import BaseModel +from print_service.schema import BaseModel, StatusResponseModel from print_service.settings import Settings, get_settings from print_service.utils import checking_for_pdf, generate_filename, generate_pin, get_file @@ -38,7 +37,6 @@ router = APIRouter() -# region Schemas class PrintOptions(BaseModel): pages: str = Field('', description='Страницы для печати', example='2-4,6') copies: int = Field(1, description='Количество копий для печати') @@ -98,10 +96,6 @@ class ReceiveOutput(BaseModel): options: PrintOptions -# endregion - - -# region handlers @router.post( '', responses={ @@ -115,11 +109,8 @@ async def send( user_auth=Depends(UnionAuth(allow_none=True)), settings: Settings = Depends(get_settings), ): - """Получить пин код для загрузки и скачивания файла. - - Полученный пин-код можно использовать в методах POST и GET `/file/{pin}`. - """ - user = db.session.query(UnionMember) + """Получить пин код для загрузки и скачивания файла""" + user = UnionMember.query(session=db.session) if not settings.ALLOW_STUDENT_NUMBER: user = user.filter(UnionMember.union_number != None) @@ -177,16 +168,11 @@ async def send( async def upload_file( pin: str, file: UploadFile = File(...), settings: Settings = Depends(get_settings) ): - """Загрузить файл на сервер. - - Требует пин-код, полученный в методе POST `/file`. Файл для пин-кода можно - загрузить лишь один раз. Файл должен быть размером до 5 000 000 байт - (меняется в настройках сервера). - """ + """Загрузить файл на сервер""" if file == ...: raise FileIsNotReceived() file_model = ( - db.session.query(FileModel) + FileModel.query(session=db.session) .filter(func.upper(FileModel.pin) == pin.upper()) .order_by(FileModel.created_at.desc()) .one_or_none() @@ -247,18 +233,14 @@ async def upload_file( async def update_file_options( pin: str, inp: SendInputUpdate, settings: Settings = Depends(get_settings) ): - """Обновляет настройки печати. - - Требует пин-код, полученный в методе POST `/file`. Обновлять настройки - можно бесконечное количество раз. Можно изменять настройки по одной.""" + """Обновляет настройки печати""" options = inp.options.model_dump(exclude_unset=True) file_model = ( - db.session.query(FileModel) + FileModel.query(session=db.session) .filter(func.upper(FileModel.pin) == pin.upper()) .order_by(FileModel.created_at.desc()) .one_or_none() ) - print(options) if not file_model: raise PINNotFound(pin) file_model.option_pages = options.get('pages') or file_model.option_pages @@ -292,13 +274,5 @@ async def update_file_options( response_model=ReceiveOutput, ) async def print_file(pin: str, settings: Settings = Depends(get_settings)): - """Получить файл для печати. - - Требует пин-код, полученный в методе POST `/file`. Файл можно скачать - бесконечное количество раз в течение 7 дней после загрузки (меняется в - настройках сервера). - """ + """Получить файл для печати""" return get_file(db.session, pin)[0] - - -# endregion diff --git a/print_service/routes/user.py b/print_service/routes/user.py index 34a3a3c..9f9b465 100644 --- a/print_service/routes/user.py +++ b/print_service/routes/user.py @@ -20,7 +20,6 @@ settings = get_settings() -# region schemas class UserCreate(BaseModel): username: constr(strip_whitespace=True, to_upper=True, min_length=1) union_number: Optional[constr(strip_whitespace=True, to_upper=True, min_length=1)] @@ -31,12 +30,6 @@ class UpdateUserList(BaseModel): users: List[UserCreate] -# endregion - - -# region handlers - - @router.get( '/is_union_member', status_code=202, @@ -49,10 +42,9 @@ async def check_union_member( number: constr(strip_whitespace=True, to_upper=True, min_length=1), v: Optional[str] = __version__, ): - """Проверяет наличие пользователя в списке.""" surname = surname.upper() - user = db.session.query(UnionMember) + user = UnionMember.query(session=db.session) if not settings.ALLOW_STUDENT_NUMBER: user = user.filter(UnionMember.union_number != None) user: UnionMember = user.filter( @@ -82,7 +74,6 @@ def update_list( input: UpdateUserList, user=Depends(UnionAuth(scopes=["print.user.create", "print.user.update", "print.user.delete"])), ): - """Обновляет данные существующего пользователя или добавляет нового, если его нет.""" logger.info(f"User {user} updated list") union_numbers = [user.union_number for user in input.users if user.union_number is not None] @@ -95,7 +86,7 @@ def update_list( for user in input.users: db_user: UnionMember = ( - db.session.query(UnionMember) + UnionMember.query(session=db.session) .filter( or_( and_( @@ -115,6 +106,8 @@ def update_list( db_user.surname = user.username db_user.union_number = user.union_number db_user.student_number = user.student_number + if db_user.is_deleted: + db_user.is_deleted = False else: db.session.add( UnionMember( @@ -127,6 +120,3 @@ def update_list( db.session.commit() return {"status": "ok", "count": len(input.users)} - - -# endregion diff --git a/print_service/schema.py b/print_service/schema.py index fa0080b..9d6e1f2 100644 --- a/print_service/schema.py +++ b/print_service/schema.py @@ -1,4 +1,10 @@ from pydantic import BaseModel -__all__ = ('BaseModel',) +class StatusResponseModel(BaseModel): + status: str + message: str + ru: str + + +__all__ = ('BaseModel', 'StatusResponseModel') diff --git a/print_service/utils/__init__.py b/print_service/utils/__init__.py index 1c44b33..8837cb0 100644 --- a/print_service/utils/__init__.py +++ b/print_service/utils/__init__.py @@ -31,7 +31,7 @@ def generate_pin(session: Session): for i in range(15): pin = ''.join(random.choice(settings.PIN_SYMBOLS) for _ in range(settings.PIN_LENGTH)) cnt = ( - session.query(File) + File.query(session=session) .filter( File.pin == pin, File.created_at + timedelta(hours=settings.STORAGE_TIME) >= datetime.utcnow(), @@ -57,7 +57,7 @@ def generate_filename(original_filename: str): def get_file(dbsession, pin: str or list[str]): pin = [pin.upper()] if isinstance(pin, str) else tuple(p.upper() for p in pin) files: list[FileModel] = ( - dbsession.query(FileModel) + FileModel.query(session=dbsession) .filter(func.upper(FileModel.pin).in_(pin)) .order_by(FileModel.created_at.desc()) .all() @@ -92,15 +92,6 @@ def get_file(dbsession, pin: str or list[str]): def checking_for_pdf(f: bytes) -> tuple[bool, int]: - """_summary_ - - Args: - f (bytes): file to check - - Returns: - tuple[bool, int]: The first argument returns whether the file is a valid pdf. - The second argument returns the number of pages in the pdf document (0- if the check failed) - """ try: pdf_file = PdfFileReader(io.BytesIO(f)) return True, pdf_file.getNumPages()