Skip to content

MoloF/python-multiprocessing-queues

Repository files navigation

📡 Python Multiprocessing Queues

Python Requests Requests

🚀 Python RabbitMQ Distribution Load (PRDL) — это проект, который решает проблему распределительной нагрузки с помощью внешних очередей RabbitMQ и внутренних очередй Python (multiprocessing.Queue).

📌 Возможности

  • 🔥 Принимает API запросы через FastAPI.
  • 📝 Обрабатывает тяжулую логику в разных потоках через multiprocessing.Queue.
  • 📊 Сохраняет результаты в Redis.
  • 🔄 Поддерживает остановку задач через API.
  • 🤖 Легкий деплой с помощью Docker compose.
  • 📦 Автодеплой (CI/CD) с помощью Jenkins Pipeline.

📥 Установка

# 1. Клонируем репозиторий
$ git clone https://github.com/MoloF/python-multiprocessing-queues
$ cd python-multiprocessing-queues

# 2. Создаем виртуальное окружение
$ python3 -m venv .venv
$ source .venv/bin/activate  # для Linux/Mac
$ .venv\Scripts\activate  # для Windows

# 3. Устанавливаем зависимости
$ pip3 install -r requirements.txt

🛠 Использование в режиме разработки

# Запуск API
$ uvicorn app.api:app --reload
# Запуск Worker
$ python3 -m app.main

📌 Пример кода

from app.queues.settings import Actor


@Actor.declare()
async def sample(a: int, b: int):
    result = a + b

    print("Task [sample] completed with result: %d" % (result, ))

    return result

⚠️ Причина создания проекта и решение проблем

  • Проведён аудит использования разных сервисов Celery, Dramatiq, RQ.
  • Аудит выявил большое количество архитектурных проблем.
    • Celery работал нестабильно и не справлялся с большой нагрузкой, он так же не мог выполнять длительные очереди.
    • Dramatiq создавал очень много ненужных подключений и в итоге вызывал проблемы с бешеной нагрузкой на целевой сервер.
    • RQ не подходил под архитектуру, нужно было более надежное решение (like RabbitMQ).
  • Решено было создать собственное решение которое на основе AIORMQ создавало одно подключение к брокеру сообщений.
  • Все поступаемые сообщения перенаправлялись в собственную очередь Python через multiprocessing.Queue.
  • Обработка очереди происходит в каждом из потоков и изолирована с целью достижения максимальной распределительной нагрузки.

📝 TODO

  • Добавить сохранение метаданных в результаты
  • Реализовать логирование
  • Избавиться от подключений к редису в каждом из потоков

🤝 Контакты

🔹 Автор: MoloF
🔹 Email: MoloF.1337@gmail.com
🔹 Telegram: @emptyphses

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors