diff --git a/benchmarks/llm_benchmark.exs b/benchmarks/llm_benchmark.exs new file mode 100644 index 00000000..acb419bb --- /dev/null +++ b/benchmarks/llm_benchmark.exs @@ -0,0 +1,21 @@ +# LLM Abstraction Layer Benchmarks +# +# Run with: mix run benchmarks/llm_benchmark.exs + +Benchee.run(%{ + "registry lookup" => fn -> + Lux.LLM.Registry.list_providers() + end, + "router classify (simple)" => fn -> + Lux.LLM.Router.classify_request("Hello world", []) + end, + "router classify (tools)" => fn -> + Lux.LLM.Router.classify_request("Use the search function to find results", []) + end, + "cache hash key" => fn -> + :erlang.phash2({"prompt text", Lux.LLM.OpenAI, "gpt-4", 0.7}) + end, + "monitoring estimate tokens" => fn -> + Lux.LLM.Monitoring.estimate_tokens(String.duplicate("Hello world ", 100)) + end +}, memory_time: 2, time: 5) diff --git a/guides/lenses/telegram_integration.livemd b/guides/lenses/telegram_integration.livemd new file mode 100644 index 00000000..b4eec5b0 --- /dev/null +++ b/guides/lenses/telegram_integration.livemd @@ -0,0 +1,322 @@ +# Telegram Integration Guide + +Welcome to the Lux Telegram Bot API Lens! This guide covers everything you need to know to build powerful Telegram bots with Lux. + +## Overview + +`Lux.Lenses.TelegramLens` provides a complete, type-safe interface to the Telegram Bot API. It handles: + +- **All core Bot API methods** (messages, media, chat management) +- **Rate limiting** with token bucket queue (30 msg/sec global limit) +- **Webhook & polling** update handlers +- **Inline & custom keyboards** +- **Error handling** with automatic retries +- **Media uploads** (photos, documents, voice, video) + +## Setup + +### 1. Get Your Bot Token + +Start a chat with [@BotFather](https://t.me/botfather) on Telegram: + +1. Send `/newbot` +2. Follow the prompts to name your bot +3. Copy the token you receive + +### 2. Configure the Token + +**Option A: Environment variable** + +```bash +export TELEGRAM_BOT_TOKEN="your_token_here" +``` + +**Option B: Elixir config** + +```elixir +# config/config.exs +config :lux, :telegram_token, "your_token_here" +``` + +## Quick Start + +### Send a Message + +```elixir +alias Lux.Lenses.TelegramLens + +# Basic message +{:ok, message} = TelegramLens.send_message(123_456_789, "Hello from Lux!") + +# With HTML formatting +{:ok, message} = TelegramLens.send_message(123_456_789, "Bold and italic", + parse_mode: "HTML" +) + +# Disable notification +{:ok, message} = TelegramLens.send_message(123_456_789, "Silent message", + disable_notification: true +) +``` + +### Send Media + +```elixir +# Send a photo +{:ok, message} = TelegramLens.send_photo(chat_id, "/path/to/photo.jpg", + caption: "A beautiful sunset" +) + +# Send a document +{:ok, message} = TelegramLens.send_document(chat_id, "/path/to/report.pdf") + +# Send voice message +{:ok, message} = TelegramLens.send_voice(chat_id, "/path/to/audio.ogg", + duration: 30 +) + +# Send video +{:ok, message} = TelegramLens.send_video(chat_id, "/path/to/video.mp4", + supports_streaming: true +) +``` + +### Inline Keyboards + +```elixir +alias Lux.Lenses.TelegramLens + +# Create inline keyboard +keyboard = TelegramLens.inline_keyboard([ + [ + TelegramLens.button("Option A", "choice_a"), + TelegramLens.button("Option B", "choice_b") + ], + [ + TelegramLens.button("Visit Website", nil, url: "https://example.com") + ] +]) + +# Send with keyboard +{:ok, message} = TelegramLens.send_message(chat_id, "Choose an option:", + reply_markup: keyboard +) +``` + +### Message Editing & Deletion + +```elixir +# Edit a message +{:ok, message} = TelegramLens.edit_message_text(chat_id, message_id, nil, + "Updated text here", + parse_mode: "HTML" +) + +# Delete a message +:ok = TelegramLens.delete_message(chat_id, message_id) +``` + +### Polls + +```elixir +# Regular anonymous poll +{:ok, message} = TelegramLens.send_poll(chat_id, "Favorite programming language?", + ["Elixir", "Rust", "Python", "Go"] +) + +# Quiz with correct answer +{:ok, message} = TelegramLens.send_poll(chat_id, "What is 2 + 2?", + ["3", "4", "5"], + type: "quiz", + correct_option_id: 1, + explanation: "Basic arithmetic!" +) + +# Close a poll +{:ok, poll} = TelegramLens.close_poll(chat_id, message_id) +``` + +## Receiving Updates + +### Option 1: Long Polling + +```elixir +defmodule MyBot do + def handle_update(update) do + if message = update["message"] do + chat_id = message["chat"]["id"] + text = message["text"] + + case text do + "/start" -> + TelegramLens.send_message(chat_id, "Welcome! Type /help for commands.") + + "/help" -> + TelegramLens.send_message(chat_id, "Available commands: /start, /help") + + _ -> + TelegramLens.send_message(chat_id, "You said: #{text}") + end + end + + :acknowledge + end +end + +# Start polling +{:ok, pid} = Lux.Lenses.TelegramLens.Polling.start_link( + handler: {MyBot, :handle_update}, + timeout: 30 +) +``` + +### Option 2: Webhook + +```elixir +# In your Phoenix/Raxx endpoint: + +plug Lux.Lenses.TelegramLens.Webhook, + handler: {MyBot, :handle_update}, + secret: System.get_env("TELEGRAM_WEBHOOK_SECRET") + +# Set the webhook +:ok = TelegramLens.set_webhook("https://myapp.com/telegram/webhook", + max_connections: 40, + allowed_updates: ["message", "callback_query"] +) +``` + +### Handling Callback Queries + +```elixir +defmodule MyBot do + def handle_update(%{"callback_query" => query}) do + data = query["data"] + message = query["message"] + + case data do + "choice_a" -> + TelegramLens.send_message(message["chat"]["id"], "You chose A!") + + "choice_b" -> + TelegramLens.send_message(message["chat"]["id"], "You chose B!") + end + + # Always answer callback queries to remove loading state + TelegramLens.answer_callback_query(query["id"]) + + :acknowledge + end +end +``` + +## Rate Limiting + +Telegram limits bots to ~30 messages per second globally. Lux handles this automatically: + +- **Write operations** (sendMessage, sendPhoto, etc.) are rate-limited +- **Read operations** (getMe, getChat, getUpdates) bypass the limiter +- Requests are queued with a 35-second timeout +- 429 errors are automatically retried after `retry_after` + +```elixir +# Burst send messages (will queue automatically) +for i <- 1..50 do + Task.async(fn -> + TelegramLens.send_message(chat_id, "Message #{i}") + end) +end +|> Task.await_many() +``` + +## Error Handling + +All functions return `{:ok, result}` or `{:error, reason}`: + +```elixir +case TelegramLens.send_message(chat_id, text) do + {:ok, message} -> + IO.puts("Sent: #{message["message_id"]}") + + {:error, {:telegram_error, code, desc}} -> + IO.puts("Telegram error #{code}: #{desc}") + + {:error, :rate_limited} -> + IO.puts("Rate limited, try again later") + + {:error, reason} -> + IO.puts("Network error: #{inspect(reason)}") +end +``` + +## Type Reference + +### Keyboard Buttons + +```elixir +# Callback button +TelegramLens.button("Click me", "callback_data") + +# URL button +TelegramLens.button("Visit", nil, url: "https://example.com") + +# Switch inline query +TelegramLens.button("Search", nil, switch_inline_query: "query") +``` + +### Inline Keyboard + +```elixir +keyboard = TelegramLens.inline_keyboard([ + # Row 1 + [ + TelegramLens.button("A", "a"), + TelegramLens.button("B", "b") + ], + # Row 2 + [ + TelegramLens.button("Link", nil, url: "https://example.com") + ] +]) +``` + +## Testing + +```elixir +# In your test file: +use ExUnit.Case, async: true + +test "sends message" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, _} = Plug.Conn.read_body(conn) + assert body["chat_id"] == 123 + assert body["text"] == "Hello" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 1}})) + end) + + assert {:ok, %{"message_id" => 1}} = TelegramLens.send_message(123, "Hello") +end +``` + +## Performance + +The lens is designed for high throughput: + +- **Concurrent requests**: Use `Task.async_stream/3` for parallel sends +- **Batched media**: Upload photos/documents in parallel, they each consume 1 token +- **Webhook preferred**: For >1000 messages/hour, use webhooks (no polling overhead) +- **Queue timeout**: 35 seconds max wait for rate limit tokens + +## Configuration Options + +```elixir +# Rate limiter settings +config :lux, :telegram_rate_limiter, + bucket_size: 30, + refill_rate: 30, + queue_timeout: 35_000 + +# Webhook secret +config :lux, :telegram, + webhook_secret: "your_secret" +``` diff --git a/guides/llm_abstraction.livemd b/guides/llm_abstraction.livemd new file mode 100644 index 00000000..78ac5eec --- /dev/null +++ b/guides/llm_abstraction.livemd @@ -0,0 +1,153 @@ +# LLM Provider Abstraction Layer + +## Overview + +The LLM Provider Abstraction Layer provides a unified interface for interacting +with multiple LLM providers (OpenAI, Anthropic, TogetherAI, Mira) through a +single API with automatic routing, fallback, caching, and cost tracking. + +## Setup + +Start the abstraction layer: + +```elixir +Lux.LLM.Manager.start_link() +``` + +## Basic Usage + +### Simple Chat + +```elixir +{:ok, response} = Lux.LLM.chat("What is Elixir?", []) +IO.puts(response.content) +``` + +### With Tools + +```elixir +tools = [my_prism, my_lens] +{:ok, response} = Lux.LLM.chat("Analyze this data", tools) +``` + +### Specific Provider + +```elixir +{:ok, response} = Lux.LLM.chat("Hello", [], provider: Lux.LLM.Anthropic) +``` + +## Routing Strategies + +```elixir +# Route by cost (cheapest model) +{:ok, response} = Lux.LLM.chat("Simple question", [], strategy: :cost) + +# Route by latency (fastest provider) +{:ok, response} = Lux.LLM.chat("Quick response needed", [], strategy: :latency) + +# Route by quality (best model) +{:ok, response} = Lux.LLM.chat("Complex analysis", [], strategy: :quality) + +# Round-robin +{:ok, response} = Lux.LLM.chat("Balance load", [], strategy: :round_robin) + +# Fallback chain +{:ok, response} = Lux.LLM.chat("Must succeed", [], strategy: :fallback, + chain: [Lux.LLM.OpenAI, Lux.LLM.Anthropic, Lux.LLM.TogetherAI]) +``` + +## Caching + +```elixir +# Enable caching +{:ok, response} = Lux.LLM.chat("Frequently asked question", [], cache: true) + +# Custom TTL (60 seconds) +{:ok, response} = Lux.LLM.chat("Cached question", [], + cache: true, cache_ttl_ms: 60_000) + +# Check cache stats +Lux.LLM.cache_stats() +# => %{size: 5, hits: 20, misses: 3, hit_rate: 0.87} +``` + +## Cost Tracking + +```elixir +# Get total cost +Lux.LLM.total_cost() +# => 0.0452 + +# Get usage report per provider +Lux.LLM.usage_report() +# => [ +# %{provider: Lux.LLM.OpenAI, request_count: 15, total_cost: 0.032, ...}, +# %{provider: Lux.LLM.Anthropic, request_count: 5, total_cost: 0.013, ...} +# ] +``` + +## Circuit Breaker + +```elixir +# Check if a provider is healthy +Lux.LLM.Fallback.circuit_open?(Lux.LLM.OpenAI) +# => false + +# Get health score (0.0 - 1.0) +Lux.LLM.Fallback.health_score(Lux.LLM.OpenAI) +# => 1.0 + +# Reset a circuit breaker +Lux.LLM.Fallback.reset_circuit(Lux.LLM.OpenAI) +``` + +## Provider Registry + +```elixir +# List all providers +Lux.LLM.Registry.list_providers() +# => [Lux.LLM.OpenAI, Lux.LLM.Anthropic, Lux.LLM.TogetherAI, Lux.LLM.Mira] + +# Get default provider +Lux.LLM.Registry.get_default() +# => Lux.LLM.OpenAI + +# Set default +Lux.LLM.Registry.set_default(Lux.LLM.Anthropic) + +# Find cheapest model meeting requirements +Lux.LLM.Registry.find_model( + Lux.LLM.Registry.list_providers(), + capabilities: [:tools], + min_context: 32_000 +) +``` + +## Configuration + +```elixir +Lux.LLM.Manager.configure( + default_provider: Lux.LLM.Anthropic, + routing_strategy: :quality, + fallback_chain: [Lux.LLM.Anthropic, Lux.LLM.OpenAI, Lux.LLM.TogetherAI], + cache_max_size: 2000 +) +``` + +## Architecture + +``` +Lux.LLM.chat/3 + | + v +Lux.LLM.Manager (orchestration) + |-- Lux.LLM.Router (provider selection) + |-- Lux.LLM.Fallback (circuit breaker + retry) + |-- Lux.LLM.Cache (ETS LRU cache) + |-- Lux.LLM.Monitoring (cost + perf tracking) + |-- Lux.LLM.Registry (provider registry) + |-- Lux.LLM.Provider (behaviour + helpers) + | + v +Provider.call/3 (OpenAI, Anthropic, TogetherAI, Mira) +``` diff --git a/lib/lux/lenses/telegram/client.ex b/lib/lux/lenses/telegram/client.ex new file mode 100644 index 00000000..fa9d5c11 --- /dev/null +++ b/lib/lux/lenses/telegram/client.ex @@ -0,0 +1,169 @@ +defmodule Lux.Lenses.TelegramLens.Client do + @moduledoc """ + HTTP client for the Telegram Bot API. + + Handles all HTTP requests to the Telegram API with: + - Exponential backoff retry on transient failures + - Rate-limit awareness (429 with retry_after) + - Comprehensive error mapping + - Token management from config or environment + """ + + @endpoint "https://api.telegram.org/bot" + @retryable_statuses [429, 500, 502, 503, 504] + @max_retries 3 + + @type method :: String.t() + @type params :: map() + @type opts :: keyword() + @type result :: {:ok, term()} | {:error, term()} + + # -------------------------------------------------------------------------- + # Public API + # -------------------------------------------------------------------------- + + @doc """ + Make a request to the Telegram Bot API. + + ## Parameters + - `method`: The Telegram API method name (e.g., "sendMessage") + - `params`: Map of parameters for the method + - `opts`: Keyword list of options + + ## Options + - `:token` - Bot token (defaults to config/env) + - `:max_retries` - Maximum retry attempts (default 3) + - `:test_mode` - Use test servers (default false) + + ## Returns + `{:ok, result}` or `{:error, reason}` + """ + @spec request(method(), params(), opts()) :: result() + def request(method, params \\ %{}, opts \\ []) do + token = resolve_token(opts) + max_retries = Keyword.get(opts, :max_retries, @max_retries) + + if is_nil(token) or token == "" do + {:error, "Telegram bot token not found. Set TELEGRAM_BOT_TOKEN or configure :lux, :telegram_token"} + else + url = build_url(token, method, opts) + do_request(method, url, params, max_retries, 0) + end + end + + @doc """ + Make a multipart request (for file uploads). + + Same as `request/3` but uses multipart/form-data encoding. + """ + @spec request_multipart(method(), params(), opts()) :: result() + def request_multipart(method, params \\ %{}, opts \\ []) do + token = resolve_token(opts) + max_retries = Keyword.get(opts, :max_retries, @max_retries) + + if is_nil(token) or token == "" do + {:error, "Telegram bot token not found"} + else + url = build_url(token, method, opts) + do_multipart_request(url, params, max_retries, 0) + end + end + + # -------------------------------------------------------------------------- + # Private + # -------------------------------------------------------------------------- + + defp resolve_token(opts) do + case Keyword.get(opts, :token) do + nil -> + Application.get_env(:lux, :telegram_token) || + System.get_env("TELEGRAM_BOT_TOKEN") + token -> + token + end + end + + defp build_url(token, method, opts) do + test_mode = Keyword.get(opts, :test_mode, false) + base = if test_mode, do: "https://api.telegram.org/bot", else: @endpoint + "#{base}#{token}/#{method}" + end + + defp do_request(_method, url, params, max_retries, attempt) do + json_body = Jason.encode!(params) + + Req.post(url, json: params) + |> process_response(url, params, max_retries, attempt) + end + + defp do_multipart_request(url, params, max_retries, attempt) do + # Handle file uploads via multipart + multipart = build_multipart(params) + + Req.post(url, body: multipart) + |> process_response(url, params, max_retries, attempt) + end + + defp build_multipart(params) do + # Build multipart body for file uploads + multipart_parts = for {key, value} <- params do + case value do + %{path: path} -> + {:file, path, key, []} + %{file_id: file_id} -> + {:file, {:form, file_id}, key, [{"content-type", "application/octet-stream"}]} + _ -> + {:multipart, [{key, value}]} + end + end + {:multipart, multipart_parts} + end + + defp process_response({:ok, %{status: 200, body: %{"ok" => true, "result" => result}}}, _url, _params, _max_retries, _attempt) do + {:ok, result} + end + + defp process_response({:ok, %{status: 429, body: %{"ok" => false, "error_code" => 429, "parameters" => %{"retry_after" => retry_after}}}}, url, params, max_retries, attempt) do + if attempt < max_retries do + Logger.warning("Telegram rate limited. Retrying after #{retry_after}s") + :timer.sleep(retry_after * 1000) + do_request("retry", url, params, max_retries, attempt + 1) + else + {:error, :rate_limited} + end + end + + defp process_response({:ok, %{status: status, body: %{"ok" => false, "description" => desc, "error_code" => code}}}, _url, _params, _max_retries, _attempt) do + {:error, {:telegram_error, code, desc}} + end + + defp process_response({:ok, %{status: status}}, _url, _params, max_retries, attempt) when status in @retryable_statuses do + if attempt < max_retries do + backoff = trunc(:math.pow(2, attempt) * 500) + Logger.warning("Telegram API error #{status}. Retrying in #{backoff}ms (attempt #{attempt + 1}/#{max_retries})") + :timer.sleep(backoff) + do_request("retry", "", %{}, max_retries, attempt + 1) + else + {:error, {:api_error, status}} + end + end + + defp process_response({:error, %{reason: %{__struct__: Tesla.TransportError, message: msg}}}, _url, _params, max_retries, attempt) do + if attempt < max_retries do + backoff = trunc(:math.pow(2, attempt) * 1000) + Logger.warning("Network error: #{msg}. Retrying in #{backoff}ms") + :timer.sleep(backoff) + do_request("retry", "", %{}, max_retries, attempt + 1) + else + {:error, {:network_error, msg}} + end + end + + defp process_response({:error, reason}, _url, _params, _max_retries, _attempt) do + {:error, reason} + end + + defp process_response(other, _url, _params, _max_retries, _attempt) do + {:error, {:unexpected_response, inspect(other)}} + end +end diff --git a/lib/lux/lenses/telegram/polling.ex b/lib/lux/lenses/telegram/polling.ex new file mode 100644 index 00000000..d181df6d --- /dev/null +++ b/lib/lux/lenses/telegram/polling.ex @@ -0,0 +1,213 @@ +defmodule Lux.Lenses.TelegramLens.Polling do + @moduledoc """ + Long polling handler for Telegram updates. + + Implements efficient long polling that: + - Tracks the update offset to avoid duplicate processing + - Automatically acknowledges processed updates + - Handles rate limits gracefully + - Supports graceful shutdown + + ## Usage + + {:ok, pid} = Lux.Lenses.TelegramLens.Polling.start_link( + handler: {MyBot, :handle_update}, + timeout: 30 + ) + + ## Options + + - `:handler` - Module and function to call for each update + - `:timeout` - Long polling timeout in seconds (default 30) + - `:limit` - Max updates per poll (default 100) + - `:allowed_updates` - Types of updates to receive + """ + + use GenServer + require Logger + + @default_timeout 30 + @default_limit 100 + @default_allowed_updates ["message", "edited_message", "callback_query"] + + @type handler :: {module(), atom()} + @type state :: %{ + handler: handler, + offset: integer(), + timeout: integer(), + limit: integer(), + allowed_updates: [String.t()], + running: boolean() + } + + # -------------------------------------------------------------------------- + # Client API + # -------------------------------------------------------------------------- + + @doc """ + Start the polling server. + """ + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: opts[:name] || __MODULE__) + end + + @doc """ + Stop the polling server. + """ + @spec stop(GenServer.server()) :: :ok + def stop(pid) do + GenServer.call(pid, :stop) + end + + @doc """ + Pause polling (e.g., during maintenance). + """ + @spec pause(GenServer.server()) :: :ok + def pause(pid) do + GenServer.call(pid, :pause) + end + + @doc """ + Resume polling. + """ + @spec resume(GenServer.server()) :: :ok + def resume(pid) do + GenServer.call(pid, :resume) + end + + @doc """ + Get current polling state. + """ + @spec get_state(GenServer.server()) :: map() + def get_state(pid) do + GenServer.call(pid, :get_state) + end + + # -------------------------------------------------------------------------- + # GenServer Callbacks + # -------------------------------------------------------------------------- + + @impl true + def init(opts) do + handler = Keyword.fetch!(opts, :handler) + + state = %{ + handler: handler, + offset: Keyword.get(opts, :offset, 0), + timeout: Keyword.get(opts, :timeout, @default_timeout), + limit: Keyword.get(opts, :limit, @default_limit), + allowed_updates: Keyword.get(opts, :allowed_updates, @default_allowed_updates), + running: true + } + + {:ok, state, {:continue, :start_polling}} + end + + @impl true + def handle_continue(:start_polling, state) do + poll_loop(state) + {:noreply, state} + end + + @impl true + def handle_call(:stop, _from, state) do + {:stop, :normal, :ok, %{state | running: false}} + end + + @impl true + def handle_call(:pause, _from, state) do + {:reply, :ok, %{state | running: false}} + end + + @impl true + def handle_call(:resume, _from, state) do + poll_loop(state) + {:reply, :ok, state} + end + + @impl true + def handle_call(:get_state, _from, state) do + {:reply, state, state} + end + + @impl true + def handle_info(:poll, state) do + poll_loop(state) + {:noreply, state} + end + + # -------------------------------------------------------------------------- + # Polling Loop + # -------------------------------------------------------------------------- + + defp poll_loop(%{running: false} = state) do + :ok + end + + defp poll_loop(%{running: true} = state) do + case TelegramLens.get_updates( + offset: state.offset, + limit: state.limit, + timeout: state.timeout, + allowed_updates: state.allowed_updates + ) do + {:ok, updates} when is_list(updates) -> + new_state = process_updates(updates, state) + schedule_poll() + {:noreply, new_state} + + {:error, :rate_limited} -> + Logger.info("Polling: rate limited, waiting 5s") + :timer.sleep(5_000) + schedule_poll() + {:noreply, state} + + {:error, reason} -> + Logger.warning("Polling error: #{inspect(reason)}, retrying in 5s") + :timer.sleep(5_000) + schedule_poll() + {:noreply, state} + end + end + + defp process_updates([], state), do: state + + defp process_updates(updates, state) do + {max_offset, new_state} = + Enum.reduce(updates, {state.offset, state}, fn update, {max_off, st} -> + case dispatch_handler(update, st) do + :ok -> + off = max_off + new_off = max(off, update["update_id"] + 1) + {new_off, %{st | offset: new_off}} + + :acknowledge -> + {max_off, st} + end + end) + + %{new_state | offset: max_offset} + end + + defp dispatch_handler(update, %{handler: {mod, fun}}) do + try do + case apply(mod, fun, [update]) do + :ok -> :ok + :acknowledge -> :acknowledge + :skip -> :acknowledge + other -> + Logger.warning("Unknown handler response: #{inspect(other)}, acknowledging") + :acknowledge + end + rescue + e -> + Logger.error("Handler error for update #{update["update_id"]}: #{inspect(e)}") + :acknowledge + end + end + + defp schedule_poll do + send(self(), :poll) + end +end diff --git a/lib/lux/lenses/telegram/rate_limiter.ex b/lib/lux/lenses/telegram/rate_limiter.ex new file mode 100644 index 00000000..23fb4580 --- /dev/null +++ b/lib/lux/lenses/telegram/rate_limiter.ex @@ -0,0 +1,212 @@ +defmodule Lux.Lenses.TelegramLens.RateLimiter do + @moduledoc """ + Token bucket rate limiter for the Telegram Bot API. + + Telegram limits bots to ~30 messages per second globally. + This module implements a token bucket algorithm to manage request + quotas and queue requests when the rate limit is approached. + + ## Usage + + RateLimiter.run([], fn -> Client.request("sendMessage", params, []) end) + + ## Configuration + + - `:bucket_size` - Maximum tokens in the bucket (default 30) + - `:refill_rate` - Tokens added per second (default 30) + - `:refill_interval` - Refill interval in ms (default 1000) + - `:queue_timeout` - Max time to wait for a token in ms (default 35_000) + """ + + use GenServer + + alias Lux.Lenses.TelegramLens.RateLimiter + + @default_bucket_size 30 + @default_refill_rate 30 + @default_refill_interval 1_000 + @default_queue_timeout 35_000 + + @type bucket :: %{tokens: float, last_refill: non_neg_integer()} + @type opts :: keyword() + + # -------------------------------------------------------------------------- + # Client API + # -------------------------------------------------------------------------- + + @doc """ + Run a function with rate limit management. + + Acquires a token from the bucket before executing. If no tokens + are available, blocks until one becomes available (up to queue_timeout). + + ## Parameters + - `opts`: Keyword list with rate limiter options + - `fun`: Zero-arity function to execute + + ## Options + - `:skip` - Set to true to bypass rate limiting (for read operations) + - All other options are passed to the rate limiter GenServer + + ## Returns + Whatever `fun` returns + """ + @spec run(opts(), (() -> result)) :: result when result: var + def run(opts \\ [], fun) do + if Keyword.get(opts, :skip, false) do + fun.() + else + do_run(fun, opts) + end + end + + defp do_run(fun, opts) do + case start_or_reuse(opts) do + {:ok, pid} -> + case acquire_token(pid, opts) do + :ok -> + try do + fun.() + after + # Token is consumed on success; on error we do not refund + # to avoid hammering a failing endpoint + :ok + end + + {:error, :timeout} -> + {:error, :rate_limit_timeout} + + {:error, reason} -> + {:error, reason} + end + + {:error, reason} -> + {:error, reason} + end + end + + @doc """ + Start a named rate limiter bucket for a bot token. + """ + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, opts, name: name) + end + + @doc """ + Get current bucket state (for debugging/monitoring). + """ + @spec get_state(GenServer.server()) :: bucket() + def get_state(pid) do + GenServer.call(pid, :get_state) + end + + @doc """ + Reset the bucket (for testing). + """ + @spec reset(GenServer.server()) :: :ok + def reset(pid) do + GenServer.call(pid, :reset) + end + + # -------------------------------------------------------------------------- + # GenServer Callbacks + # -------------------------------------------------------------------------- + + @impl true + def init(opts) do + bucket_size = Keyword.get(opts, :bucket_size, @default_bucket_size) + refill_rate = Keyword.get(opts, :refill_rate, @default_refill_rate) + refill_interval = Keyword.get(opts, :refill_interval, @default_refill_interval) + queue_timeout = Keyword.get(opts, :queue_timeout, @default_queue_timeout) + + state = %{ + tokens: bucket_size, + max_tokens: bucket_size, + refill_rate: refill_rate, + refill_interval: refill_interval, + queue_timeout: queue_timeout, + last_refill: current_time_ms() + } + + {:ok, state} + end + + @impl true + def handle_call(:get_state, _from, state) do + current = refill_bucket(state) + {:reply, %{tokens: current.tokens}, current} + end + + @impl true + def handle_call(:reset, _from, _state) do + {:reply, :ok, %{tokens: @default_bucket_size, max_tokens: @default_bucket_size, + refill_rate: @default_refill_rate, refill_interval: @default_refill_interval, + queue_timeout: @default_queue_timeout, last_refill: current_time_ms()}} + end + + @impl true + def handle_call(:acquire, _from, state) do + current = refill_bucket(state) + + if current.tokens >= 1.0 do + {:reply, :ok, %{current | tokens: current.tokens - 1.0}} + else + {:reply, {:wait, current.tokens}, current} + end + end + + # -------------------------------------------------------------------------- + # Internal + # -------------------------------------------------------------------------- + + defp acquire_token(pid, opts) do + queue_timeout = Keyword.get(opts, :queue_timeout, @default_queue_timeout) + + case GenServer.call(pid, :acquire, queue_timeout) do + :ok -> :ok + {:wait, _tokens} -> + :timer.sleep(50) + acquire_token(pid, opts) + end + end + + defp refill_bucket(%{tokens: tokens, max_tokens: max, refill_rate: rate, + refill_interval: interval, last_refill: last} = state) do + now = current_time_ms() + elapsed = now - last + + if elapsed >= interval do + cycles = div(elapsed, interval) + new_tokens = min(max, tokens + (cycles * rate)) + %{state | tokens: new_tokens, last_refill: now} + else + state + end + end + + defp current_time_ms do + System.system_time(:millisecond) + end + + defp start_or_reuse(opts) do + # Each token = bot token hash for isolation, global bucket + token = Keyword.get(opts, :token, "default") + name = via_tuple(token) + Process.sleep(0) # allow other processes + + case GenServer.start_link(__MODULE__, opts, name: name) do + {:ok, pid} -> {:ok, pid} + {:error, {:already_started, pid}} -> {:ok, pid} + error -> error + end + rescue + _ -> {:error, :rate_limiter_unavailable} + end + + defp via_tuple(token) do + key = :erlang.phash2(token) + {:via, Registry, {Lux.Lenses.TelegramLens.RateLimiter.Registry, key}} + end +end diff --git a/lib/lux/lenses/telegram/types.ex b/lib/lux/lenses/telegram/types.ex new file mode 100644 index 00000000..634e1324 --- /dev/null +++ b/lib/lux/lenses/telegram/types.ex @@ -0,0 +1,451 @@ +defmodule Lux.Lenses.TelegramLens.Types do + @moduledoc """ + Type definitions for Telegram Bot API entities. + + These types mirror the Telegram Bot API types and are used for + documentation and dialyzer type checking. + """ + + # -------------------------------------------------------------------------- + # User-Facing Types + # -------------------------------------------------------------------------- + + @type user :: %{ + required(:id) => integer, + optional(:is_bot) => boolean, + optional(:first_name) => String.t(), + optional(:last_name) => String.t(), + optional(:username) => String.t(), + optional(:language_code) => String.t() + } + + @type chat :: %{ + required(:id) => integer, + required(:type) => String.t(), # "private", "group", "supergroup", "channel" + optional(:title) => String.t(), + optional(:username) => String.t(), + optional(:first_name) => String.t(), + optional(:last_name) => String.t(), + optional(:description) => String.t(), + optional(:invite_link) => String.t(), + optional(:pinned_message) => message(), + optional(:permissions) => chat_permissions(), + optional(:slow_mode_delay) => integer, + optional(:sticker_set_name) => String.t(), + optional(:can_set_sticker_set) => boolean + } + + @type chat_permissions :: %{ + optional(:can_send_messages) => boolean, + optional(:can_send_media_messages) => boolean, + optional(:can_send_polls) => boolean, + optional(:can_send_other_messages) => boolean, + optional(:can_add_web_page_previews) => boolean, + optional(:can_change_info) => boolean, + optional(:can_invite_users) => boolean, + optional(:can_pin_messages) => boolean + } + + @type message :: %{ + required(:message_id) => integer, + required(:date) => integer, + required(:chat) => chat(), + optional(:from) => user(), + optional(:forward_from) => user(), + optional(:forward_from_chat) => chat(), + optional(:forward_from_message_id) => integer, + optional(:forward_signature) => String.t(), + optional(:forward_date) => integer, + optional(:reply_to_message) => message(), + optional(:via_bot) => user(), + optional(:edit_date) => integer, + optional(:media_group_id) => String.t(), + optional(:author_signature) => String.t(), + optional(:text) => String.t(), + optional(:entities) => [message_entity()], + optional(:caption_entities) => [message_entity()], + optional(:audio) => audio(), + optional(:document) => document(), + optional(:animation) => animation(), + optional(:game) => game(), + optional(:photo) => [photo_size()], + optional(:sticker) => sticker(), + optional(:video) => video(), + optional(:video_note) => video_note(), + optional(:voice) => voice(), + optional(:caption) => String.t(), + optional(:contact) => contact(), + optional(:location) => location(), + optional(:venue) => venue(), + optional(:poll) => poll(), + optional(:new_chat_members) => [user()], + optional(:new_chat_title) => String.t(), + optional(:new_chat_photo) => [photo_size()], + optional(:delete_chat_photo) => boolean, + optional(:group_chat_created) => boolean, + optional(:supergroup_chat_created) => boolean, + optional(:channel_chat_created) => boolean, + optional(:migrate_to_chat_id) => integer, + optional(:migrate_from_chat_id) => integer, + optional(:pinned_message) => message(), + optional(:invoice) => invoice(), + optional(:successful_payment) => successful_payment(), + optional(:connected_website) => String.t(), + optional(:passport_data) => passport_data(), + optional(:reply_markup) => inline_keyboard_markup() + } + + @type message_entity :: %{ + required(:type) => String.t(), + required(:offset) => integer, + required(:length) => integer, + optional(:url) => String.t(), + optional(:user) => user() + } + + @type photo_size :: %{ + required(:file_id) => String.t(), + required(:file_unique_id) => String.t(), + required(:width) => integer, + required(:height) => integer, + optional(:file_size) => integer + } + + @type audio :: %{ + required(:file_id) => String.t(), + required(:file_unique_id) => String.t(), + required(:duration) => integer, + optional(:performer) => String.t(), + optional(:title) => String.t(), + optional(:file_name) => String.t(), + optional(:mime_type) => String.t(), + optional(:file_size) => integer + } + + @type document :: %{ + required(:file_id) => String.t(), + required(:file_unique_id) => String.t(), + optional(:thumbnail) => photo_size(), + optional(:file_name) => String.t(), + optional(:mime_type) => String.t(), + optional(:file_size) => integer + } + + @type animation :: %{ + required(:file_id) => String.t(), + required(:file_unique_id) => String.t(), + required(:width) => integer, + required(:height) => integer, + required(:duration) => integer, + optional(:thumbnail) => photo_size(), + optional(:file_name) => String.t(), + optional(:mime_type) => String.t(), + optional(:file_size) => integer + } + + @type video :: %{ + required(:file_id) => String.t(), + required(:file_unique_id) => String.t(), + required(:width) => integer, + required(:height) => integer, + required(:duration) => integer, + optional(:thumbnail) => photo_size(), + optional(:mime_type) => String.t(), + optional(:file_size) => integer + } + + @type video_note :: %{ + required(:file_id) => String.t(), + required(:file_unique_id) => String.t(), + required(:length) => integer, + required(:duration) => integer, + optional(:thumbnail) => photo_size(), + optional(:file_size) => integer + } + + @type voice :: %{ + required(:file_id) => String.t(), + required(:file_unique_id) => String.t(), + required(:duration) => integer, + optional(:mime_type) => String.t(), + optional(:file_size) => integer + } + + @type contact :: %{ + required(:phone_number) => String.t(), + required(:first_name) => String.t(), + optional(:last_name) => String.t(), + optional(:user_id) => integer, + optional(:vcard) => String.t() + } + + @type location :: %{ + required(:longitude) => float, + required(:latitude) => float, + optional(:horizontal_accuracy) => float, + optional(:live_period) => integer, + optional(:heading) => integer, + optional(:proximity_alert_radius) => integer + } + + @type venue :: %{ + required(:location) => location(), + required(:title) => String.t(), + required(:address) => String.t(), + optional(:foursquare_id) => String.t(), + optional(:foursquare_type) => String.t(), + optional(:google_place_id) => String.t(), + optional(:google_place_type) => String.t() + } + + @type poll :: %{ + required(:id) => String.t(), + required(:question) => String.t(), + required(:options) => [poll_option()], + required(:total_voter_count) => integer, + required(:is_closed) => boolean, + required(:is_anonymous) => boolean, + required(:type) => String.t(), + required(:allows_multiple_answers) => boolean, + optional(:correct_option_id) => integer, + optional(:explanation) => String.t(), + optional(:explanation_entities) => [message_entity()], + optional(:open_period) => integer, + optional(:close_date) => integer + } + + @type poll_option :: %{ + required(:text) => String.t(), + required(:voter_count) => integer + } + + @type game :: %{ + required(:title) => String.t(), + required(:description) => String.t(), + required(:photo) => [photo_size()], + optional(:text) => String.t(), + optional(:text_entities) => [message_entity()], + optional(:animation) => animation() + } + + @type invoice :: %{ + required(:title) => String.t(), + required(:description) => String.t(), + required(:start_parameter) => String.t(), + required(:currency) => String.t(), + required(:total_amount) => integer + } + + @type successful_payment :: %{ + required(:currency) => String.t(), + required(:total_amount) => integer, + required(:invoice_payload) => String.t(), + required(:shipping_option_id) => String.t(), + optional(:order_info) => order_info(), + required(:telegram_payment_charge_id) => String.t(), + required(:provider_payment_charge_id) => String.t() + } + + @type order_info :: %{ + optional(:name) => String.t(), + optional(:phone_number) => String.t(), + optional(:email) => String.t(), + optional(:shipping_address) => shipping_address() + } + + @type shipping_address :: %{ + required(:country_code) => String.t(), + required(:state) => String.t(), + required(:city) => String.t(), + required(:street_line1) => String.t(), + required(:street_line2) => String.t(), + required(:post_code) => String.t() + } + + @type passport_data :: %{ + required(:data) => [encrypted_passport_element()], + required(:credentials) => encrypted_credentials() + } + + @type encrypted_passport_element :: %{ + required(:type) => String.t(), + optional(:data) => String.t(), + optional(:phone_number) => String.t(), + optional(:email) => String.t(), + optional(:files) => [passport_file()], + optional(:front_side) => passport_file(), + optional(:reverse_side) => passport_file(), + optional(:selfie) => passport_file(), + optional(:translation) => [passport_file()], + optional(:hash) => String.t() + } + + @type passport_file :: %{ + required(:file_id) => String.t(), + required(:file_unique_id) => String.t(), + required(:file_size) => integer, + required(:file_date) => integer + } + + @type encrypted_credentials :: %{ + required(:data) => String.t(), + required(:hash) => String.t(), + required(:secret) => String.t() + } + + @type sticker :: %{ + required(:file_id) => String.t(), + required(:file_unique_id) => String.t(), + required(:width) => integer, + required(:height) => integer, + required(:is_animated) => boolean, + required(:is_video) => boolean, + optional(:thumbnail) => photo_size(), + optional(:emoji) => String.t(), + optional(:set_name) => String.t(), + optional(:mask_position) => mask_position(), + optional(:file_size) => integer + } + + @type mask_position :: %{ + required(:point) => String.t(), + required(:scale) => float, + optional(:x_shift) => float, + optional(:y_shift) => float + } + + # -------------------------------------------------------------------------- + # Update Types + # -------------------------------------------------------------------------- + + @type update :: %{ + required(:update_id) => integer, + optional(:message) => message(), + optional(:edited_message) => message(), + optional(:channel_post) => message(), + optional(:edited_channel_post) => message(), + optional(:inline_query) => inline_query(), + optional(:chosen_inline_result) => chosen_inline_result(), + optional(:callback_query) => callback_query(), + optional(:shipping_query) => shipping_query(), + optional(:pre_checkout_query) => pre_checkout_query(), + optional(:poll) => poll(), + optional(:poll_answer) => poll_answer() + } + + @type inline_query :: %{ + required(:id) => String.t(), + required(:from) => user(), + required(:query) => String.t(), + required(:offset) => String.t(), + optional(:chat_type) => String.t(), + optional(:location) => location() + } + + @type chosen_inline_result :: %{ + required(:result_id) => String.t(), + required(:from) => user(), + required(:query) => String.t(), + optional(:location) => location(), + optional(:inline_message_id) => String.t() + } + + @type callback_query :: %{ + required(:id) => String.t(), + required(:from) => user(), + optional(:message) => message(), + optional(:inline_message_id) => String.t(), + optional(:chat_instance) => String.t(), + optional(:chat) => chat(), + optional(:date) => integer, + optional(:game_short_name) => String.t(), + optional(:data) => String.t() + } + + @type shipping_query :: %{ + required(:id) => String.t(), + required(:from) => user(), + required(:invoice_payload) => String.t(), + required(:shipping_address) => shipping_address() + } + + @type pre_checkout_query :: %{ + required(:id) => String.t(), + required(:from) => user(), + required(:currency) => String.t(), + required(:total_amount) => integer, + required(:invoice_payload) => String.t(), + optional(:shipping_option_id) => String.t(), + optional(:order_info) => order_info() + } + + @type poll_answer :: %{ + required(:poll_id) => String.t(), + required(:user) => user(), + required(:option_ids) => [integer] + } + + # -------------------------------------------------------------------------- + # Webhook Types + # -------------------------------------------------------------------------- + + @type webhook_info :: %{ + required(:url) => String.t(), + required(:has_custom_certificate) => boolean, + required(:pending_update_count) => integer, + optional(:ip_address) => String.t(), + optional(:last_error_date) => integer, + optional(:last_error_message) => String.t(), + optional(:max_connections) => integer, + optional(:allowed_updates) => [String.t()] + } + + # -------------------------------------------------------------------------- + # Input Types + # -------------------------------------------------------------------------- + + @type input_file :: %{ + required(:path) => String.t(), + optional(:file_name) => String.t(), + optional(:mime_type) => String.t() + } + + @type inline_keyboard_button :: %{ + required(:text) => String.t(), + optional(:url) => String.t(), + optional(:callback_data) => String.t(), + optional(:callback_game) => map(), + optional(:switch_inline_query) => String.t(), + optional(:switch_inline_query_current_chat) => String.t(), + optional(:pay) => boolean + } + + @type inline_keyboard_markup :: %{ + required(:inline_keyboard) => [[inline_keyboard_button()]] + } + + @type reply_keyboard_markup :: %{ + required(:keyboard) => [[keyboard_button()]], + optional(:resize_keyboard) => boolean, + optional(:one_time_keyboard) => boolean, + optional(:input_field_placeholder) => String.t(), + optional(:selective) => boolean + } + + @type keyboard_button :: %{ + required(:text) => String.t(), + optional(:request_contact) => boolean, + optional(:request_location) => boolean, + optional(:request_poll) => keyboard_button_poll_type() + } + + @type keyboard_button_poll_type :: %{ + optional(:type) => String.t() + } + + @type force_reply :: %{ + required(:force_reply) => boolean, + optional(:input_field_placeholder) => String.t(), + optional(:selective) => boolean + } +end diff --git a/lib/lux/lenses/telegram/webhook.ex b/lib/lux/lenses/telegram/webhook.ex new file mode 100644 index 00000000..b9edfb53 --- /dev/null +++ b/lib/lux/lenses/telegram/webhook.ex @@ -0,0 +1,113 @@ +defmodule Lux.Lenses.TelegramLens.Webhook do + @moduledoc """ + Webhook server for receiving Telegram updates. + + This module provides a Plug-based webhook endpoint that receives + updates from Telegram and dispatches them to a configured handler. + + ## Usage + + plug Lux.Lenses.TelegramLens.Webhook, + handler: {MyBot, :handle_update}, + secret: System.get_env("TELEGRAM_SECRET") + + ## Security + + All incoming requests are validated using the HMAC signature + sent by Telegram in the `X-Telegram-Bot-Api-Secret-Token` header. + + ## Configuration + + Set your secret token in config: + + config :lux, :telegram, + webhook_secret: "my_webhook_secret" + """ + + import Plug.Conn + + @behaviour Plug + + @impl true + def init(opts) do + handler = Keyword.fetch!(opts, :handler) + secret = Keyword.get(opts, :secret, Application.get_env(:lux, :telegram, [])[:webhook_secret]) + path = Keyword.get(opts, :path, "/telegram/webhook") + + %{ + handler: handler, + secret: secret, + path: path + } + end + + @impl true + def call(%Plug.Conn{method: "POST", path_info: path_info} = conn, %{path: path} = config) do + if List.starts_with?(path_info, path_segments(path)) do + do_webhook(conn, config) + else + conn + end + end + + def call(conn, _config), do: conn + + defp do_webhook(conn, config) do + with {:ok, body, conn} <- read_body(conn), + {:ok, update} <- Jason.decode(body), + :ok <- validate_secret(conn, config), + :ok <- dispatch_update(update, config) do + send_resp(conn, 200, "ok") + else + {:error, :invalid_signature} -> + send_resp(conn, 401, "unauthorized") + + {:error, :invalid_payload} -> + send_resp(conn, 400, "bad request") + + {:error, reason} -> + Logger.error("Webhook error: #{inspect(reason)}") + send_resp(conn, 500, "internal error") + end + end + + defp validate_secret(%Plug.Conn{} = conn, %{secret: nil}) do + :ok + end + + defp validate_secret(%Plug.Conn{} = conn, %{secret: secret}) do + received = get_req_header(conn, "x-telegram-bot-api-secret-token") |> List.first() + + if valid_signature?(secret, received) do + :ok + else + {:error, :invalid_signature} + end + end + + defp valid_signature?(_secret, nil), do: false + defp valid_signature?(secret, token) do + :crypto.hmac(:sha256, secret, token) == token + rescue + _ -> false + end + + defp dispatch_update(update, %{handler: {mod, fun}}) do + try do + apply(mod, fun, [update]) + :ok + rescue + e -> + Logger.error("Handler error: #{inspect(e)}") + {:error, e} + end + end + + defp path_segments("/" <> path) do + String.split(path, "/", trim: true) + end + + defp path_segments(path) do + String.split(path, "/", trim: true) + end +end diff --git a/lib/lux/lenses/telegram_lens.ex b/lib/lux/lenses/telegram_lens.ex new file mode 100644 index 00000000..9ecbb51a --- /dev/null +++ b/lib/lux/lenses/telegram_lens.ex @@ -0,0 +1,761 @@ +defmodule Lux.Lenses.TelegramLens do + @moduledoc """ + Complete Telegram Bot API Lens for Lux. + + A Lens provides a fluent, composable interface over Lux's HTTP client + (`Client`) and rate-limiting infrastructure (`RateLimiter`). + + ## Usage + + alias Lux.Lenses.TelegramLens, as: T + + # Without options — uses the default token from config + T.get_me(config) + T.send_message(config, chat_id, "Hello!") + + # With override options (per-request token, timeout, etc.) + T.send_message(config, chat_id, "Hello!", parse_mode: "Markdown") + T.send_photo(config, chat_id, photo_input, caption: "Look!") + + ## Design Notes + + - Every public function accepts `config :: Lux.Client.Config.t()` as its first + argument. The config carries the bot token, default headers, adapter opts, etc. + - All API calls go through `Client.request/3` so they benefit from retry, logging, + telemetry, and the configured HTTP adapter (Finch by default). + - `RateLimiter.run/2` is wrapped around every mutating write call so that burst + sending does not trigger Telegram's 30 msg/s flood limit. + - Helper functions (`put_chat_id`, `put_optional`, etc.) live in the private + section and are the single canonical place where payload maps are assembled. + - Inline keyboard helpers (`inline_keyboard/1`, `button/3`) return plain maps that + can be embedded in any send/edit payload. + - `focus/2` is a tiny combinator — it threads the current state through an + arbitrary function — useful when building larger pipelines or test fixtures. + """ + + # --------------------------------------------------------------------------- + # Dependencies + # --------------------------------------------------------------------------- + + alias Lux.{Client, RateLimiter} + + # --------------------------------------------------------------------------- + # Compile-time constants + # --------------------------------------------------------------------------- + + @base_url "https://api.telegram.org" + + # --------------------------------------------------------------------------- + # Public API — Identity / Bot info + # --------------------------------------------------------------------------- + + @doc """ + A call to `getMe`. + + Returns basic information about the bot in the form of a `User` object. + + ## Example + + T.get_me(config) + |> Lux.Client.request() + #=> {:ok, %{id: 123456789, is_bot: true, first_name: "MyBot", …}} + + """ + @spec get_me(Client.config()) :: Client.result(map()) + def get_me(config) do + Client.request(config, :get, "/bot#{bot_token(config)}/getMe") + end + + # --------------------------------------------------------------------------- + # Public API — Sending messages + # --------------------------------------------------------------------------- + + @doc """ + A call to `sendMessage`. + + Sends a text message to the given `chat_id`. All Telegram send options are + available via the optional keyword list. + + ## Required arguments + + - `chat_id` — integer or binary (`"@channel"` / `"123456789"`) + + ## Optional keys (passed as a keyword list) + + - `:parse_mode` — `\"HTML\"` | `\"Markdown\"` | `\"MarkdownV2\"` + - `:disable_web_page_preview` + - `:disable_notification` + - `:reply_to_message_id` + - `:allow_sending_without_reply` + - `:reply_markup` — a pre-built reply markup map (e.g. from `inline_keyboard/1`) + + ## Example + + T.send_message(config, 123456789, \"Hello *world*!\", parse_mode: \"Markdown\") + + """ + @spec send_message(Client.config(), Client.chat_id(), String.t(), keyword()) :: + Client.result(map()) + def send_message(config, chat_id, text, opts \\ []) do + payload = + %{} + |> put_chat_id(chat_id) + |> put_optional(:text, text) + |> put_parse_mode(opts) + |> put_optional(:disable_web_page_preview, opts, :disable_web_page_preview) + |> put_optional(:disable_notification, opts, :disable_notification) + |> put_optional(:reply_to_message_id, opts, :reply_to_message_id) + |> put_optional(:allow_sending_without_reply, opts, :allow_sending_without_reply) + |> put_optional(:reply_markup, opts, :reply_markup) + + RateLimiter.run(config, fn -> + Client.request(config, :post, "/bot#{bot_token(config)}/sendMessage", payload) + end) + end + + @doc """ + A call to `forwardMessage`. + + Forwards a message from one chat to another without triggering a new + rate-limit bucket — forwarding is cheap. + + ## Arguments + + - `chat_id` — destination chat + - `from_chat_id` — source chat + - `message_id` — message id in the source chat + + """ + @spec forward_message(Client.config(), Client.chat_id(), Client.chat_id(), integer(), keyword()) :: + Client.result(map()) + def forward_message(config, chat_id, from_chat_id, message_id, opts \\ []) do + payload = + %{} + |> put_chat_id(chat_id) + |> put_optional(:from_chat_id, from_chat_id) + |> put_optional(:message_id, message_id) + |> put_optional(:disable_notification, opts, :disable_notification) + + # Forward is read-like from a rate-limit perspective, but Telegram's + # flood rules apply to the *destination* chat, so we rate-limit it too. + RateLimiter.run(config, fn -> + Client.request(config, :post, "/bot#{bot_token(config)}/forwardMessage", payload) + end) + end + + # --------------------------------------------------------------------------- + # Public API — Editing messages + # --------------------------------------------------------------------------- + + @doc """ + A call to `editMessageText`. + + Edits the text of an inline message or a message sent by the bot. + + The four identification arguments are mutually exclusive: + + - pass `chat_id` + `message_id` for a regular chat message + - pass only `inline_message_id` for an inline query answer + + ## Optional keys + + - `:parse_mode` + - `:disable_web_page_preview` + - `:reply_markup` + + """ + @spec edit_message_text(Client.config(), integer(), integer() | nil, String.t() | nil, keyword()) :: + Client.result(map()) + def edit_message_text(config, chat_id, message_id, text, opts \\ []) do + payload = + %{} + |> put_message_id(chat_id, message_id) + |> put_inline_message_id(opts) + |> put_optional(:text, text) + |> put_parse_mode(opts) + |> put_optional(:disable_web_page_preview, opts, :disable_web_page_preview) + |> put_optional(:reply_markup, opts, :reply_markup) + + RateLimiter.run(config, fn -> + Client.request(config, :post, "/bot#{bot_token(config)}/editMessageText", payload) + end) + end + + @doc """ + A call to `editMessageCaption`. + + Edits the caption of a message sent by the bot (or an inline message). + + ## Optional keys + + - `:caption` — new caption text + - `:parse_mode` + - `:reply_markup` + + Identification is identical to `edit_message_text/5`. + + """ + @spec edit_message_caption(Client.config(), integer() | nil, integer() | nil, keyword()) :: + Client.result(map()) + def edit_message_caption(config, chat_id, message_id, opts \\ []) do + payload = + %{} + |> put_message_id(chat_id, message_id) + |> put_inline_message_id(opts) + |> put_optional(:caption, opts, :caption) + |> put_parse_mode(opts) + |> put_optional(:reply_markup, opts, :reply_markup) + + RateLimiter.run(config, fn -> + Client.request(config, :post, "/bot#{bot_token(config)}/editMessageCaption", payload) + end) + end + + # --------------------------------------------------------------------------- + # Public API — Deleting messages + # --------------------------------------------------------------------------- + + @doc """ + A call to `deleteMessage`. + + Deletes a message. Note that bots can only delete messages that were sent + by the bot itself or in a supergroup. + + """ + @spec delete_message(Client.config(), integer()) :: Client.result(boolean()) + def delete_message(config, message_id) do + payload = %{message_id: message_id} + + RateLimiter.run(config, fn -> + Client.request(config, :post, "/bot#{bot_token(config)}/deleteMessage", payload) + end) + end + + # --------------------------------------------------------------------------- + # Public API — Sending media + # --------------------------------------------------------------------------- + + @doc """ + A call to `sendPhoto`. + + Sends a photo. `photo` can be a: + + - URL string (`"https://…/photo.jpg"`) + - local file path (`"/tmp/photo.jpg"` or `"C:\\Photos\\photo.jpg"`) + - `{:file, file_id}` tuple (an already-uploaded file reference) + + ## Optional keys + + - `:caption` + - `:parse_mode` + - `:disable_notification` + - `:reply_to_message_id` + - `:reply_markup` + + """ + @spec send_photo(Client.config(), Client.chat_id(), Client.input_file(), keyword()) :: + Client.result(map()) + def send_photo(config, chat_id, photo, opts \\ []) do + send_media(config, chat_id, "photo", photo, opts) + end + + @doc """ + A call to `sendDocument`. + + Sends a general file. Same input conventions as `send_photo/3`. + + ## Optional keys + + - `:caption` + - `:parse_mode` + - `:disable_notification` + - `:reply_to_message_id` + - `:reply_markup` + - `:thumb` — thumbnail image (URL, path, or `{:file, id}`) + + """ + @spec send_document(Client.config(), Client.chat_id(), Client.input_file(), keyword()) :: + Client.result(map()) + def send_document(config, chat_id, document, opts \\ []) do + send_media(config, chat_id, "document", document, opts) + end + + @doc """ + A call to `sendVoice`. + + Sends an audio file. Telegram will display it as a voice message. + Accepts the same inputs as `send_photo/3`. + + ## Optional keys + + - `:caption` + - `:parse_mode` + - `:duration` + - `:performer` + - `:title` + - `:disable_notification` + - `:reply_to_message_id` + - `:reply_markup` + + """ + @spec send_voice(Client.config(), Client.chat_id(), Client.input_file(), keyword()) :: + Client.result(map()) + def send_voice(config, chat_id, voice, opts \\ []) do + send_media(config, chat_id, "voice", voice, opts) + end + + @doc """ + A call to `sendVideo`. + + Sends a video. Accepts the same inputs as `send_photo/3`. + + ## Optional keys + + - `:caption` + - `:parse_mode` + - `:duration` + - `:width` + - `:height` + - `:disable_notification` + - `:reply_to_message_id` + - `:reply_markup` + - `:thumb` + + """ + @spec send_video(Client.config(), Client.chat_id(), Client.input_file(), keyword()) :: + Client.result(map()) + def send_video(config, chat_id, video, opts \\ []) do + send_media(config, chat_id, "video", video, opts) + end + + # --------------------------------------------------------------------------- + # Public API — Chat queries + # --------------------------------------------------------------------------- + + @doc """ + A call to `getChat`. + + Returns information about a chat. `chat_id` can be an integer, a username + (string starting with `@`), or a `channel:` / `group:` link. + + """ + @spec get_chat(Client.config(), Client.chat_id()) :: Client.result(map()) + def get_chat(config, chat_id) do + payload = put_chat_id(%{}, chat_id) + Client.request(config, :get, "/bot#{bot_token(config)}/getChat", payload) + end + + @doc """ + A call to `getChatMemberCount`. + + Returns the number of members in a chat. + + """ + @spec get_chat_member_count(Client.config(), Client.chat_id()) :: Client.result(non_neg_integer()) + def get_chat_member_count(config, chat_id) do + payload = put_chat_id(%{}, chat_id) + Client.request(config, :get, "/bot#{bot_token(config)}/getChatMemberCount", payload) + end + + # --------------------------------------------------------------------------- + # Public API — Updates (long-polling) + # --------------------------------------------------------------------------- + + @doc """ + A call to `getUpdates`. + + This is the underlying long-poll driver. Pass `offset` and `timeout` + to control the polling loop. The response contains an array of `Update` + objects. + + ## Optional keys + + - `:offset` — pass the last `update_id + 1` to acknowledge processed updates + - `:limit` — 1–100 (default 100) + - `:timeout` — long-poll timeout in seconds (0–100, default 0) + - `:allowed_updates` — list of update types to receive + + """ + @spec get_updates(Client.config(), keyword()) :: Client.result([map()]) + def get_updates(config, opts \\ []) do + payload = + %{} + |> maybe_put(:offset, opts, :offset) + |> maybe_put(:limit, opts, :limit) + |> maybe_put(:timeout, opts, :timeout) + |> maybe_put(:allowed_updates, opts, :allowed_updates) + + Client.request(config, :get, "/bot#{bot_token(config)}/getUpdates", payload) + end + + # --------------------------------------------------------------------------- + # Public API — Webhook management + # --------------------------------------------------------------------------- + + @doc """ + A call to `setWebhook`. + + Registers `url` as the webhook endpoint for this bot. Telegram will then + push `Update` objects to that URL instead of the bot having to poll. + + ## Optional keys + + - `:certificate` — an `{:file, path}` tuple pointing at your PEM cert + - `:max_connections` + - `:allowed_updates` + - `:drop_pending_updates` + - `:secret_token` — a secret string that will be sent in the + `X-Telegram-Bot-Api-Secret-Token` header + + """ + @spec set_webhook(Client.config(), String.t(), keyword()) :: Client.result(boolean()) + def set_webhook(config, url, opts \\ []) do + payload = + %{} + |> put_optional(:url, url) + |> maybe_put(:certificate, opts, :certificate) + |> maybe_put(:max_connections, opts, :max_connections) + |> maybe_put(:allowed_updates, opts, :allowed_updates) + |> maybe_put(:drop_pending_updates, opts, :drop_pending_updates) + |> maybe_put(:secret_token, opts, :secret_token) + + Client.request(config, :post, "/bot#{bot_token(config)}/setWebhook", payload) + end + + @doc """ + A call to `deleteWebhook`. + + Removes the webhook integration. After this call the bot returns to + getUpdates-based polling. + + ## Optional keys + + - `:drop_pending_updates` — pass `true` to discard pending updates + + """ + @spec delete_webhook(Client.config(), keyword()) :: Client.result(boolean()) + def delete_webhook(config, opts \\ []) do + payload = maybe_put(%{}, :drop_pending_updates, opts, :drop_pending_updates) + Client.request(config, :post, "/bot#{bot_token(config)}/deleteWebhook", payload) + end + + @doc """ + A call to `getWebhookInfo`. + + Returns current webhook status and debug info. + + """ + @spec get_webhook_info(Client.config()) :: Client.result(map()) + def get_webhook_info(config) do + Client.request(config, :get, "/bot#{bot_token(config)}/getWebhookInfo") + end + + # --------------------------------------------------------------------------- + # Public API — Polls + # --------------------------------------------------------------------------- + + @doc """ + A call to `sendPoll`. + + Sends a native Telegram poll to the given chat. + + ## Required arguments + + - `chat_id` — destination chat + - `question` — poll question text + - `options` — list of binary option strings (2–10 options) + + ## Optional keys + + - `:is_anonymous` — default `true` + - `:type` — `\"regular\"` | `\"quiz\"` + - `:allows_multiple_answers` + - `:correct_option_id` — required when `:type` is `\"quiz\"` + - `:explanation` + - `:explanation_parse_mode` + - `:open_period` — 5–600 seconds + - `:close_date` — Unix timestamp (integer) + - `:is_closed` + - `:disable_notification` + - `:reply_to_message_id` + - `:reply_markup` + + """ + @spec send_poll(Client.config(), Client.chat_id(), String.t(), [String.t()], keyword()) :: + Client.result(map()) + def send_poll(config, chat_id, question, options, opts \\ []) when is_list(options) do + payload = + %{} + |> put_chat_id(chat_id) + |> put_optional(:question, question) + |> put_optional(:options, options) + |> put_parse_mode(opts) + |> maybe_put(:is_anonymous, opts, :is_anonymous) + |> maybe_put(:type, opts, :type) + |> maybe_put(:allows_multiple_answers, opts, :allows_multiple_answers) + |> maybe_put(:correct_option_id, opts, :correct_option_id) + |> maybe_put(:explanation, opts, :explanation) + |> maybe_put(:explanation_parse_mode, opts, :explanation_parse_mode) + |> maybe_put(:open_period, opts, :open_period) + |> maybe_put(:close_date, opts, :close_date) + |> maybe_put(:is_closed, opts, :is_closed) + |> maybe_put(:disable_notification, opts, :disable_notification) + |> maybe_put(:reply_to_message_id, opts, :reply_to_message_id) + |> maybe_put(:reply_markup, opts, :reply_markup) + + RateLimiter.run(config, fn -> + Client.request(config, :post, "/bot#{bot_token(config)}/sendPoll", payload) + end) + end + + @doc """ + A call to `closePoll`. + + Stops a poll previously sent by the bot. + + ## Arguments + + - `chat_id` — the chat where the poll was sent + - `message_id` — the original poll message id + + """ + @spec close_poll(Client.config(), integer(), keyword()) :: Client.result(map()) + def close_poll(config, message_id, opts \\ []) do + payload = + %{} + |> put_optional(:chat_id, opts, :chat_id) + |> put_optional(:message_id, message_id) + + RateLimiter.run(config, fn -> + Client.request(config, :post, "/bot#{bot_token(config)}/closePoll", payload) + end) + end + + # --------------------------------------------------------------------------- + # Public API — Inline keyboard helpers + # --------------------------------------------------------------------------- + + @doc """ + Builds an inline keyboard from a list of button rows. + + Each row is a list of `button/3` results. + + ## Example + + keyboard = + T.inline_keyboard([ + [T.button("Google", url: "https://google.com"), + T.button("Callback", callback_data: "my_action:do_it")], + [T.button("Switch inline", switch_inline_query: "search")] + ]) + + T.send_message(config, chat_id, "Pick one:", reply_markup: keyboard) + + """ + @spec inline_keyboard([[map()]]) :: %{inline_keyboard: [[map()]]} + def inline_keyboard(rows) when is_list(rows) and is_list(hd(rows)) do + %{inline_keyboard: rows} + end + + @doc """ + Builds a single inline keyboard button. + + One of the following keyword pairs **must** be provided (or passed as + positional arguments): + + | key | description | + |--------------------|-------------| + | `:url` | HTTP/HTTPS URL to open | + | `:callback_data` | Data sent back in a `callback_query` | + | `:switch_inline_query` | Starts inline query for the current user | + | `:switch_inline_query_current_chat` | Same but pre-filled with current chat | + | `:callback_game` | Launches a game (no data — use with `callback_data` manually) | + + Text is always the first positional argument. + + ## Examples + + button("Click me", callback_data: "myapp:action") + button("Open", url: "https://example.com") + button("Search", switch_inline_query: "query") + + """ + @spec button(String.t(), keyword()) :: map() + def button(text, data \\ []) do + map = %{text: text} + + case data do + [url: v] -> Map.put(map, :url, v) + [callback_data: v] -> Map.put(map, :callback_data, v) + [switch_inline_query: v] -> Map.put(map, :switch_inline_query, v) + [switch_inline_query_current_chat: v] -> Map.put(map, :switch_inline_query_current_chat, v) + [callback_game: v] -> Map.put(map, :callback_game, v) + _ -> map + end + end + + # --------------------------------------------------------------------------- + # Public API — Focus (combinator) + # --------------------------------------------------------------------------- + + @doc """ + Focus threads `state` through an arbitrary function. + + This exists primarily to support pipeline-based testing and composition: + + config + |> T.focus(& &1) # identity + |> T.send_message(chat_id, "Hi!") # then send + + In practice, `send_*` functions are called directly, but `focus/2` is useful + when building fixtures or when a lens needs to be "peeled back" to reveal + its underlying state. + + """ + @spec focus(Client.config(), (Client.config() -> Client.config())) :: Client.config() + def focus(config, fun) when is_function(fun, 1) do + fun.(config) + end + + # --------------------------------------------------------------------------- + # Private helpers + # --------------------------------------------------------------------------- + + # --------------------------------------------------------------------------- + # put_parse_mode/2 + # --------------------------------------------------------------------------- + + defp put_parse_mode(payload, opts) do + maybe_put(payload, :parse_mode, opts, :parse_mode) + end + + # --------------------------------------------------------------------------- + # put_optional/3 — inject a value from opts into payload when present + # --------------------------------------------------------------------------- + + defp put_optional(payload, _key, nil), do: payload + defp put_optional(payload, _key, []), do: payload + + defp put_optional(payload, key, value) when is_atom(key) do + Map.put(payload, key, value) + end + + # Variant that reads from opts keyword list + defp put_optional(payload, key, opts, opt_key) do + case Keyword.fetch(opts, opt_key) do + {:ok, v} -> Map.put(payload, key, v) + :error -> payload + end + end + + # --------------------------------------------------------------------------- + # put_chat_id/2 + # --------------------------------------------------------------------------- + + defp put_chat_id(payload, chat_id) when is_binary(chat_id) or is_integer(chat_id) do + Map.put(payload, :chat_id, chat_id) + end + + # --------------------------------------------------------------------------- + # put_message_id/3 + # --------------------------------------------------------------------------- + + # When chat_id is nil the whole block is skipped (inline message path) + defp put_message_id(payload, nil, _message_id), do: payload + + defp put_message_id(payload, chat_id, message_id) do + payload + |> Map.put(:chat_id, chat_id) + |> Map.put(:message_id, message_id) + end + + # --------------------------------------------------------------------------- + # put_inline_message_id/2 + # --------------------------------------------------------------------------- + + defp put_inline_message_id(payload, opts) do + case Keyword.fetch(opts, :inline_message_id) do + {:ok, v} -> Map.put(payload, :inline_message_id, v) + :error -> payload + end + end + + # --------------------------------------------------------------------------- + # put_media/4 (internal shared helper for send_photo/send_document/etc.) + # --------------------------------------------------------------------------- + + defp put_media(payload, _key, nil), do: payload + defp put_media(payload, _key, ""), do: payload + + defp put_media(payload, key, value) when is_binary(value) do + Map.put(payload, key, value) + end + + # --------------------------------------------------------------------------- + # maybe_put/3 — inject into payload only when the value is truthy and not nil + # --------------------------------------------------------------------------- + + defp maybe_put(payload, _key, nil) do + payload + end + + defp maybe_put(payload, key, opts, opt_key) do + case Keyword.fetch(opts, opt_key) do + {:ok, v} when v != nil -> Map.put(payload, key, v) + _ -> payload + end + end + + # --------------------------------------------------------------------------- + # Internal — shared send_media for photo / document / voice / video + # --------------------------------------------------------------------------- + + # This module attribute holds the per-method optional keys so that each + # send_* variant stays DRY. + @media_optional_keys [ + :caption, + :parse_mode, + :duration, + :width, + :height, + :thumb, + :disable_notification, + :reply_to_message_id, + :reply_markup, + :performer, + :title + ] + + defp send_media(config, chat_id, media_type, media, opts) do + payload = + %{} + |> put_chat_id(chat_id) + |> put_media(String.to_atom(media_type), media) + |> put_optional(:caption, opts, :caption) + |> put_parse_mode(opts) + |> put_optional(:duration, opts, :duration) + |> put_optional(:width, opts, :width) + |> put_optional(:height, opts, :height) + |> put_optional(:thumb, opts, :thumb) + |> put_optional(:disable_notification, opts, :disable_notification) + |> put_optional(:reply_to_message_id, opts, :reply_to_message_id) + |> put_optional(:reply_markup, opts, :reply_markup) + |> put_optional(:performer, opts, :performer) + |> put_optional(:title, opts, :title) + + endpoint = "/bot#{bot_token(config)}/send#{String.upcase(media_type)}" + + RateLimiter.run(config, fn -> + Client.request(config, :post, endpoint, payload) + end) + end + + # --------------------------------------------------------------------------- + # Internal — resolve bot token from config + # --------------------------------------------------------------------------- + + # Compile-time placeholder — will be replaced by mixcompile-time or runtime + # resolution once the Lux.Client.Config struct is finalised. + defp bot_token(%Client.Config{token: token}), do: token + defp bot_token(_), do: raise("TelegramLens requires a Lux.Client.Config with a :token field") +end diff --git a/lib/lux/llm.ex b/lib/lux/llm.ex new file mode 100644 index 00000000..48a723a7 --- /dev/null +++ b/lib/lux/llm.ex @@ -0,0 +1,62 @@ +defmodule Lux.LLM do + @moduledoc """ + A module for interacting with LLMs. Defines the behaviours for LLMs + and provides a default implementation. + + Also provides the high-level `chat/3` function powered by the + abstraction layer (Lux.LLM.Manager) for automatic routing, fallback, + caching, and cost tracking. + """ + + defmodule Response do + @moduledoc "A response from an LLM." + + @type t :: %__MODULE__{ + content: String.t() | nil, + tool_calls: [%{type: String.t(), name: String.t(), params: map()}], + finish_reason: String.t() | nil, + structured_output: map() | nil + } + + defstruct content: nil, + tool_calls: [], + finish_reason: nil, + structured_output: nil + end + + @type prompt :: String.t() + @type tools :: [Lux.Prism.t() | Lux.Beam.t() | Lux.Lens.t()] + @type options :: map() | keyword() + + @callback call(prompt(), tools(), options()) :: {:ok, Response.t()} | {:error, String.t()} + + @default_module Application.compile_env(:lux, [Lux.LLM, :default_module], Lux.LLM.OpenAI) + + defdelegate call(prompt, tools, options), to: @default_module + + # ── High-level API (abstraction layer) ── + + @doc """ + Send a chat request with automatic provider routing, fallback, and cost tracking. + + Wraps `Lux.LLM.Manager.chat/3`. See its docs for available options. + """ + @spec chat(String.t() | [map()], tools(), keyword()) :: {:ok, Response.t()} | {:error, String.t()} + defdelegate chat(prompt, tools \\ [], opts \\ []), to: Lux.LLM.Manager + + @doc "Configure the abstraction layer at runtime." + @spec configure(keyword()) :: :ok + defdelegate configure(opts), to: Lux.LLM.Manager + + @doc "Get a usage report across all providers." + @spec usage_report() :: [map()] + defdelegate usage_report(), to: Lux.LLM.Manager + + @doc "Get the total estimated cost." + @spec total_cost() :: float() + defdelegate total_cost(), to: Lux.LLM.Manager + + @doc "Get cache hit/miss statistics." + @spec cache_stats() :: map() + defdelegate cache_stats(), to: Lux.LLM.Manager +end diff --git a/lib/lux/llm/cache.ex b/lib/lux/llm/cache.ex new file mode 100644 index 00000000..637b3123 --- /dev/null +++ b/lib/lux/llm/cache.ex @@ -0,0 +1,165 @@ +defmodule Lux.LLM.Cache do + @moduledoc """ + ETS-based LRU response cache with TTL support. + + Uses a simple list-based LRU tracking and ETS for storage. + Cache keys are derived from prompt + model + temperature. + """ + + @cache_table :lux_llm_cache + @lru_table :lux_llm_cache_lru + + @default_max_size 1000 + @default_ttl_ms 300_000 + + defstruct max_size: @default_max_size, ttl_ms: @default_ttl_ms + + @doc "Start the cache (creates ETS tables)." + @spec start_link(keyword()) :: :ok + def start_link(opts \\ []) do + if :ets.whereis(@cache_table) == :undefined do + :ets.new(@cache_table, [:set, :public, :named_table, read_concurrency: true]) + end + if :ets.whereis(@lru_table) == :undefined do + :ets.new(@lru_table, [:set, :public, :named_table]) + end + :ok + end + + @doc """ + Fetch from cache, or compute and cache the result. + Returns `{:ok, response, :cached}` or `{:ok, response, :computed}` or `{:error, reason}`. + """ + @spec fetch_or_compute(String.t(), module(), String.t(), number(), (-> {:ok, Lux.LLM.Response.t()} | {:error, String.t()})) :: + {:ok, Lux.LLM.Response.t(), :cached | :computed} | {:error, String.t()} + def fetch_or_compute(prompt, provider, model, temperature, compute_fn) do + key = cache_key(prompt, provider, model, temperature) + + case :ets.lookup(@cache_table, key) do + [{^key, response, expires_at}] -> + if System.monotonic_time(:millisecond) < expires_at do + touch_lru(key) + {:ok, response, :cached} + else + :ets.delete(@cache_table, key) + delete_lru(key) + compute_and_cache(key, compute_fn) + end + [] -> + compute_and_cache(key, compute_fn) + end + end + + @doc "Look up a cached response." + @spec get(String.t(), module(), String.t(), number()) :: {:ok, Lux.LLM.Response.t()} | :miss + def get(prompt, provider, model, temperature) do + key = cache_key(prompt, provider, model, temperature) + case :ets.lookup(@cache_table, key) do + [{^key, response, expires_at}] -> + if System.monotonic_time(:millisecond) < expires_at do + touch_lru(key) + {:ok, response} + else + :ets.delete(@cache_table, key) + :miss + end + [] -> :miss + end + end + + @doc "Manually insert into cache." + @spec put(String.t(), module(), String.t(), number(), Lux.LLM.Response.t(), keyword()) :: :ok + def put(prompt, provider, model, temperature, response, opts \\ []) do + ttl = Keyword.get(opts, :ttl_ms, @default_ttl_ms) + key = cache_key(prompt, provider, model, temperature) + evict_if_full() + :ets.insert(@cache_table, {key, response, System.monotonic_time(:millisecond) + ttl}) + touch_lru(key) + :ok + end + + @doc "Invalidate all cache entries." + @spec invalidate_all() :: :ok + def invalidate_all do + :ets.delete_all_objects(@cache_table) + :ets.delete_all_objects(@lru_table) + :ok + end + + @doc "Cache statistics." + @spec stats() :: %{size: non_neg_integer(), hits: non_neg_integer(), misses: non_neg_integer()} + def stats do + size = :ets.info(@cache_table, :size) + hits = case :ets.lookup(@lru_table, :hits) do [{_, h}] -> h; [] -> 0 end + misses = case :ets.lookup(@lru_table, :misses) do [{_, m}] -> m; [] -> 0 end + %{size: size, hits: hits, misses: misses, hit_rate: if(hits + misses > 0, do: hits / (hits + misses), else: 0.0)} + end + + @spec reset_stats() :: :ok + def reset_stats do + :ets.insert(@lru_table, {:hits, 0}) + :ets.insert(@lru_table, {:misses, 0}) + :ok + end + + # ── Internal ── + + defp cache_key(prompt, provider, model, temperature) do + :erlang.phash2({prompt, provider, model, temperature}) + end + + defp compute_and_cache(key, compute_fn) do + record_miss() + case compute_fn.() do + {:ok, response} -> + evict_if_full() + :ets.insert(@cache_table, {key, response, System.monotonic_time(:millisecond) + @default_ttl_ms}) + touch_lru(key) + {:ok, response, :computed} + {:error, reason} -> + {:error, reason} + end + end + + defp touch_lru(key) do + record_hit() + # Simple LRU: delete old entry and re-insert (moves to "end") + :ets.delete(@lru_table, {:lru, key}) + :ets.insert(@lru_table, {:lru, key}) + end + + defp delete_lru(key) do + :ets.delete(@lru_table, {:lru, key}) + end + + defp evict_if_full do + size = :ets.info(@cache_table, :size) + if size >= @default_max_size do + # Evict oldest entries (first inserted LRU keys) + case :ets.first(@lru_table) do + :"$end_of_table" -> :ok + {:lru, old_key} -> + :ets.delete(@lru_table, {:lru, old_key}) + :ets.delete(@cache_table, old_key) + :ok + _ -> :ok + end + end + end + + defp record_hit do + try do + :ets.update_counter(@lru_table, :hits, {2, 1}, {:hits, 0}) + rescue + ArgumentError -> :ok + end + end + + defp record_miss do + try do + :ets.update_counter(@lru_table, :misses, {2, 1}, {:misses, 0}) + rescue + ArgumentError -> :ok + end + end +end diff --git a/lib/lux/llm/fallback.ex b/lib/lux/llm/fallback.ex new file mode 100644 index 00000000..43d0aadb --- /dev/null +++ b/lib/lux/llm/fallback.ex @@ -0,0 +1,208 @@ +defmodule Lux.LLM.Fallback do + @moduledoc """ + Fallback and retry logic with per-provider circuit breakers. + + Circuit breaker states: + - `:closed` – normal operation + - `:open` – provider is failing, skip for cooldown period + - `:half_open` – cooldown elapsed, try one request as probe + """ + + use GenServer + + alias Lux.LLM.Router + + @default_max_failures 3 + @default_cooldown_ms 30_000 + @default_max_retries 2 + @default_base_delay 500 + + defstruct [:max_failures, :cooldown_ms, :max_retries, :base_delay] + + # ── Client API ── + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @spec child_spec(keyword()) :: Supervisor.child_spec() + def child_spec(opts) do + %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}} + end + + @doc """ + Execute a request against providers in order, with circuit-breaker + checks and exponential-backoff retry. + """ + @spec execute([module()], String.t() | [map()], [map()], keyword()) :: + {:ok, module(), Lux.LLM.Response.t()} | {:error, String.t()} + def execute(providers, prompt, tools, opts \\ []) do + max_retries = Keyword.get(opts, :max_retries, @default_max_retries) + base_delay = Keyword.get(opts, :base_delay, @default_base_delay) + + do_execute(providers, prompt, tools, max_retries, base_delay, 0, nil) + end + + @doc "Check if a provider circuit breaker is open." + @spec circuit_open?(module()) :: boolean() + def circuit_open?(provider) do + GenServer.call(__MODULE__, {:circuit_state, provider}) == :open + end + + @doc "Get the state of a provider circuit breaker." + @spec circuit_state(module()) :: :closed | :open | :half_open + def circuit_state(provider) do + GenServer.call(__MODULE__, {:circuit_state, provider}) + end + + @doc "Reset a provider circuit breaker to closed." + @spec reset_circuit(module()) :: :ok + def reset_circuit(provider) do + GenServer.cast(__MODULE__, {:reset, provider}) + end + + @doc "Get health score for a provider (0.0–1.0)." + @spec health_score(module()) :: float() + def health_score(provider) do + GenServer.call(__MODULE__, {:health_score, provider}) + end + + # ── GenServer callbacks ── + + @impl true + def init(opts) do + Router.init_tables() + state = %__MODULE__{ + max_failures: Keyword.get(opts, :max_failures, @default_max_failures), + cooldown_ms: Keyword.get(opts, :cooldown_ms, @default_cooldown_ms), + max_retries: Keyword.get(opts, :max_retries, @default_max_retries), + base_delay: Keyword.get(opts, :base_delay, @default_base_delay) + } + {:ok, state} + end + + @impl true + def handle_call({:circuit_state, provider}, _from, state) do + cs = get_circuit(provider) + cs = maybe_transition(cs, provider) + {:reply, cs.state, state} + end + + @impl true + def handle_call({:health_score, provider}, _from, state) do + score = case get_circuit(provider) do + %{state: :closed, failure_count: 0} -> 1.0 + %{state: :closed, failure_count: c} -> max(0.0, 1.0 - c / max(state.max_failures, 1)) + %{state: :half_open} -> 0.5 + %{state: :open} -> 0.0 + end + {:reply, score, state} + end + + @impl true + def handle_cast({:reset, provider}, state) do + set_circuit(provider, %{state: :closed, failure_count: 0, opened_at: nil}) + {:noreply, state} + end + + @impl true + def handle_cast({:failure, provider}, state) do + cs = get_circuit(provider) + new_cs = case cs do + %{state: :closed, failure_count: c} when c >= state.max_failures - 1 -> + %{state: :open, failure_count: c + 1, opened_at: System.monotonic_time(:millisecond)} + %{state: :closed, failure_count: c} -> + %{cs | failure_count: c + 1} + %{state: :half_open} -> + %{state: :open, failure_count: cs.failure_count + 1, opened_at: System.monotonic_time(:millisecond)} + other -> other + end + set_circuit(provider, new_cs) + {:noreply, state} + end + + @impl true + def handle_cast({:success, provider}, state) do + set_circuit(provider, %{state: :closed, failure_count: 0, opened_at: nil}) + {:noreply, state} + end + + # ── Internal ── + + defp do_execute([], _prompt, _tools, _retries, _delay, _err), do: {:error, "all providers failed"} + + defp do_execute([provider | rest], prompt, tools, retries, delay, _last_err) do + cs = get_circuit(provider) + cs = maybe_transition(cs, provider) + + case cs.state do + :open -> + do_execute(rest, prompt, tools, retries, delay, "circuit open for #{inspect(provider)}") + _ -> + case call_with_retry(provider, prompt, tools, retries, delay) do + {:ok, resp} -> + GenServer.cast(__MODULE__, {:success, provider}) + {:ok, provider, resp} + {:error, reason} -> + GenServer.cast(__MODULE__, {:failure, provider}) + do_execute(rest, prompt, tools, retries, delay, reason) + end + end + end + + defp call_with_retry(provider, prompt, tools, retries_left, base_delay) do + do_retry(provider, prompt, tools, retries_left, base_delay, nil) + end + + defp do_retry(_provider, _prompt, _tools, 0, _delay, last_err) when last_err != nil, do: {:error, last_err} + + defp do_retry(provider, prompt, tools, retries, delay, _last_err) do + case provider.call(prompt, tools, []) do + {:ok, _} = ok -> ok + {:error, _} = err -> + if retries > 0 do + jitter = :rand.uniform(delay) + Process.sleep(delay + jitter) + do_retry(provider, prompt, tools, retries - 1, delay * 2, nil) + else + err + end + end + end + + defp get_circuit(provider) do + case :ets.lookup(:lux_llm_circuits, provider) do + [{^provider, cs}] -> cs + [] -> %{state: :closed, failure_count: 0, opened_at: nil} + end + end + + defp set_circuit(provider, cs) do + :ets.insert(:lux_llm_circuits, {provider, cs}) + end + + defp maybe_transition(%{state: :open, opened_at: opened_at} = cs, provider) do + cooldown = 30_000 + if System.monotonic_time(:millisecond) - opened_at > cooldown do + new_cs = %{cs | state: :half_open} + set_circuit(provider, new_cs) + new_cs + else + cs + end + end + + defp maybe_transition(cs, _provider), do: cs +end + +defmodule Lux.LLM.Fallback.CircuitBreaker do + @moduledoc false + # Alias for Router compatibility + use GenServer + + def start_link(_), do: {:ok, :ok} + def handle_cast({:failure, p}, _), do: Lux.LLM.Fallback.record_failure(p); {:noreply, []} + def handle_cast({:success, p}, _), do: Lux.LLM.Fallback.record_success(p); {:noreply, []} + def handle_call(_, _, s), do: {:reply, nil, s} +end diff --git a/lib/lux/llm/manager.ex b/lib/lux/llm/manager.ex new file mode 100644 index 00000000..dcd24422 --- /dev/null +++ b/lib/lux/llm/manager.ex @@ -0,0 +1,133 @@ +defmodule Lux.LLM.Manager do + @moduledoc """ + High-level orchestration layer for LLM interactions. + + Provides a unified `chat/3` API that automatically handles: + - Provider selection via Router + - Fallback on failure + - Response caching + - Cost and performance tracking + - Configuration management + """ + + alias Lux.LLM.{Router, Registry, Monitoring, Cache, Fallback} + + @doc """ + Send a chat request through the abstraction layer. + + Options: + - `:provider` – specific provider module (skip routing) + - `:strategy` – routing strategy (:cost, :latency, :quality, :round_robin, :fallback) + - `:cache` – enable caching (default: false) + - `:cache_ttl_ms` – cache TTL in ms (default: 300_000) + - `:fallback` – enable fallback (default: true) + - `:track_cost` – enable cost tracking (default: true) + - `:model` – specific model name + - `:temperature` – temperature (default: 0.7) + - `:chain` – fallback chain (list of provider modules) + """ + @spec chat(String.t() | [map()], [map()], keyword()) :: + {:ok, Lux.LLM.Response.t()} | {:error, String.t()} + def chat(prompt, tools \\ [], opts \\ []) do + provider = Keyword.get(opts, :provider) + use_cache = Keyword.get(opts, :cache, false) + use_fallback = Keyword.get(opts, :fallback, true) + track = Keyword.get(opts, :track_cost, true) + model = Keyword.get(opts, :model) + temperature = Keyword.get(opts, :temperature, 0.7) + + start_time = System.monotonic_time(:millisecond) + + result = cond do + provider -> + call_direct(provider, prompt, tools, opts) + use_fallback -> + case Router.route_with_fallback(prompt, tools, opts) do + {:ok, _provider, resp} -> {:ok, resp} + {:error, reason} -> {:error, reason} + end + true -> + case Router.route(prompt, tools, opts) do + {:ok, p} -> call_direct(p, prompt, tools, opts) + {:error, reason} -> {:error, reason} + end + end + + if track do + latency = System.monotonic_time(:millisecond) - start_time + record_usage(result, provider, model, latency, prompt) + end + + result + end + + @doc "Configure the LLM abstraction layer at runtime." + @spec configure(keyword()) :: :ok + def configure(opts) do + if provider = Keyword.get(opts, :default_provider) do + Registry.set_default(provider) + end + if strategy = Keyword.get(opts, :routing_strategy) do + :persistent_term.put({__MODULE__, :strategy}, strategy) + end + if chain = Keyword.get(opts, :fallback_chain) do + :persistent_term.put({__MODULE__, :chain}, chain) + end + if cache_size = Keyword.get(opts, :cache_max_size) do + :persistent_term.put({__MODULE__, :cache_max_size}, cache_size) + end + :ok + end + + @doc "Get a usage report for all providers." + @spec usage_report() :: [map()] + def usage_report, do: Monitoring.usage_report() + + @doc "Get total estimated cost." + @spec total_cost() :: float() + def total_cost, do: Monitoring.total_cost() + + @doc "Get cache statistics." + @spec cache_stats() :: map() + def cache_stats, do: Cache.stats() + + @doc "Start all subsystems (Registry, Fallback, Monitoring, Cache)." + @spec start_link(keyword()) :: :ok + def start_link(_opts \\ []) do + Registry.start_link([]) + Fallback.start_link([]) + Monitoring.start_link([]) + Cache.start_link([]) + :ok + end + + @doc "Child spec for supervision trees." + def child_spec(opts) do + %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}, type: :supervisor} + end + + # ── Internal ── + + defp call_direct(provider, prompt, tools, opts) do + try do + provider.call(prompt, tools, opts) + rescue + e -> {:error, Exception.message(e)} + catch + :exit, reason -> {:error, "provider exited: #{inspect(reason)}"} + end + end + + defp record_usage({:ok, _resp}, provider, model, latency_ms, prompt) do + p = provider || Registry.get_default() || Lux.LLM.OpenAI + m = model || "unknown" + input_tokens = Monitoring.estimate_tokens(to_string(prompt)) + output_tokens = 0 + Monitoring.track_request(%{ + provider: p, model: m, latency_ms: latency_ms, + input_tokens: input_tokens, output_tokens: output_tokens + }) + end + + defp record_usage({:error, _}, _provider, _model, _latency, _prompt), do: :ok +end diff --git a/lib/lux/llm/monitoring.ex b/lib/lux/llm/monitoring.ex new file mode 100644 index 00000000..92f90345 --- /dev/null +++ b/lib/lux/llm/monitoring.ex @@ -0,0 +1,175 @@ +defmodule Lux.LLM.Monitoring do + @moduledoc """ + ETS-backed cost and performance tracking for LLM requests. + + All writes go through :ets.update_counter or direct insert for speed. + Metrics are aggregated per provider and per model. + """ + + @stats_table :lux_llm_stats + @requests_table :lux_llm_requests + + @doc "Start monitoring (creates ETS tables)." + @spec start_link(keyword()) :: :ok + def start_link(_opts \\ []) do + if :ets.whereis(@stats_table) == :undefined do + :ets.new(@stats_table, [:set, :public, :named_table, read_concurrency: true, write_concurrency: true]) + end + if :ets.whereis(@requests_table) == :undefined do + :ets.new(@requests_table, [:bag, :public, :named_table, read_concurrency: true]) + end + :ok + end + + @doc "Record a completed request." + @spec track_request(map()) :: :ok + def track_request(%{provider: provider, model: model, latency_ms: latency_ms, input_tokens: input_tokens, output_tokens: output_tokens} = _req) do + now = System.system_time(:second) + + # Provider stats + pk = {:provider, provider} + case :ets.lookup(@stats_table, pk) do + [^pk, s] -> + new_count = s.request_count + 1 + new_total_latency = s.total_latency_ms + latency_ms + new_input_tokens = s.total_input_tokens + input_tokens + new_output_tokens = s.total_output_tokens + output_tokens + new_total_cost = s.total_cost + estimate_cost(provider, model, input_tokens, output_tokens) + :ets.insert(@stats_table, {pk, %{s | + request_count: new_count, + total_latency_ms: new_total_latency, + total_input_tokens: new_input_tokens, + total_output_tokens: new_output_tokens, + total_cost: new_total_cost, + last_request_at: now + }}) + [] -> + cost = estimate_cost(provider, model, input_tokens, output_tokens) + :ets.insert(@stats_table, {pk, %{ + request_count: 1, + total_latency_ms: latency_ms, + total_input_tokens: input_tokens, + total_output_tokens: output_tokens, + total_cost: cost, + last_request_at: now + }}) + end + + # Model stats + mk = {:model, provider, model} + case :ets.lookup(@stats_table, mk) do + [^mk, ms] -> + :ets.insert(@stats_table, {mk, %{ms | + request_count: ms.request_count + 1, + total_latency_ms: ms.total_latency_ms + latency_ms, + total_input_tokens: ms.total_input_tokens + input_tokens, + total_output_tokens: ms.total_output_tokens + output_tokens + }}) + [] -> + :ets.insert(@stats_table, {mk, %{ + request_count: 1, + total_latency_ms: latency_ms, + total_input_tokens: input_tokens, + total_output_tokens: output_tokens + }}) + end + + :ok + end + + @doc "Get aggregated stats for a provider." + @spec provider_stats(module()) :: map() + def provider_stats(provider) do + case :ets.lookup(@stats_table, {:provider, provider}) do + [_, s] -> + %{s | avg_latency_ms: if(s.request_count > 0, do: s.total_latency_ms / s.request_count, else: 0.0)} + [] -> %{request_count: 0, total_latency_ms: 0, avg_latency_ms: 0.0, total_cost: 0.0, total_input_tokens: 0, total_output_tokens: 0} + end + end + + @doc "Get aggregated stats for a specific model." + @spec model_stats(module(), String.t()) :: map() + def model_stats(provider, model) do + case :ets.lookup(@stats_table, {:model, provider, model}) do + [_, ms] -> ms + [] -> %{request_count: 0, total_latency_ms: 0, total_input_tokens: 0, total_output_tokens: 0} + end + end + + @doc "Total request count across all providers." + @spec request_count() :: non_neg_integer() + def request_count do + @stats_table + |> :ets.match_object({{:provider, :_}, :_}) + |> Enum.map(fn {_, s} -> s.request_count end) + |> Enum.sum() + end + + @doc "Total estimated cost across all providers." + @spec total_cost() :: float() + def total_cost do + @stats_table + |> :ets.match_object({{:provider, :_}, :_}) + |> Enum.map(fn {_, s} -> s.total_cost end) + |> Enum.sum() + end + + @doc "Generate a usage report for all providers." + @spec usage_report() :: [map()] + def usage_report do + @stats_table + |> :ets.match_object({{:provider, :_}, :_}) + |> Enum.map(fn {{:provider, provider}, s} -> + avg_lat = if s.request_count > 0, do: Float.round(s.total_latency_ms / s.request_count, 1), else: 0.0 + %{ + provider: provider, + request_count: s.request_count, + total_cost: Float.round(s.total_cost, 4), + avg_latency_ms: avg_lat, + total_input_tokens: s.total_input_tokens, + total_output_tokens: s.total_output_tokens + } + end) + |> Enum.sort_by(& &1.request_count, :desc) + end + + @doc "Reset all stats." + @spec reset() :: :ok + def reset do + :ets.delete_all_objects(@stats_table) + :ets.delete_all_objects(@requests_table) + :ok + end + + # ── Cost estimation ── + + @model_prices %{ + {"Lux.LLM.OpenAI", "gpt-4"} => {0.03, 0.06}, + {"Lux.LLM.OpenAI", "gpt-4-turbo"} => {0.01, 0.03}, + {"Lux.LLM.OpenAI", "gpt-3.5-turbo"} => {0.0005, 0.0015}, + {"Lux.LLM.Anthropic", "claude-3-opus-20240229"} => {0.015, 0.075}, + {"Lux.LLM.Anthropic", "claude-3-sonnet-20240229"} => {0.003, 0.015}, + {"Lux.LLM.TogetherAI", "mistral-7b-instruct"} => {0.0002, 0.0002}, + {"Lux.LLM.Mira", "llama-3.1-8b-instruct"} => {0.0002, 0.0002} + } + + @spec estimate_cost(module(), String.t(), non_neg_integer(), non_neg_integer()) :: float() + def estimate_cost(provider, model, input_tokens, output_tokens) do + case Map.get(@model_prices, {inspect(provider), model}) do + {in_price, out_price} -> + (input_tokens * in_price + output_tokens * out_price) / 1_000_000 + nil -> + # Fallback: rough estimate based on provider tier + default_in = 0.001 + default_out = 0.003 + (input_tokens * default_in + output_tokens * default_out) / 1_000_000 + end + end + + @doc "Approximate token count for text." + @spec estimate_tokens(String.t()) :: non_neg_integer() + def estimate_tokens(text) do + # Rough estimate: ~4 characters per token + (String.length(text) / 4) |> round() + end +end diff --git a/lib/lux/llm/provider.ex b/lib/lux/llm/provider.ex new file mode 100644 index 00000000..c76c1510 --- /dev/null +++ b/lib/lux/llm/provider.ex @@ -0,0 +1,64 @@ +defmodule Lux.LLM.Provider do + @moduledoc """ + Behaviour that all LLM providers must implement, plus helpers for + working with providers as first-class values. + + Each provider declares its capabilities and the models it supports + together with cost and context-window metadata. + """ + + @type model_info :: %{ + id: String.t(), + context_window: pos_integer(), + input_cost: float(), + output_cost: float() + } + + @type provider_metadata :: %{ + name: String.t(), + models: [model_info()], + capabilities: [:tools | :vision | :streaming] + } + + @callback call(prompt :: String.t() | [map()], tools :: [map()], opts :: keyword()) :: + {:ok, Lux.LLM.Response.t()} | {:error, String.t()} + + @callback metadata() :: provider_metadata() + + @callback health_check() :: :ok | {:error, String.t()} + + defmacro __using__(_opts) do + quote do + @behaviour Lux.LLM.Provider + + def metadata do + %{name: __MODULE__ |> Module.split() |> List.last(), models: [], capabilities: [:tools, :streaming]} + end + + def health_check do + :ok + end + + defoverridable metadata: 0, health_check: 0 + end + end + + @spec name(module()) :: String.t() + def name(module), do: module.metadata()[:name] + + @spec capabilities(module()) :: [:tools | :vision | :streaming] + def capabilities(module), do: module.metadata()[:capabilities] + + @spec models(module()) :: [model_info()] + def models(module), do: module.metadata()[:models] + + @spec model!(String.t(), pos_integer(), float(), float()) :: model_info() + def model!(id, context_window, input_cost, output_cost) do + %{id: id, context_window: context_window, input_cost: input_cost, output_cost: output_cost} + end + + @spec known_providers() :: [module()] + def known_providers do + [Lux.LLM.OpenAI, Lux.LLM.Anthropic, Lux.LLM.TogetherAI, Lux.LLM.Mira] + end +end diff --git a/lib/lux/llm/registry.ex b/lib/lux/llm/registry.ex new file mode 100644 index 00000000..d2771020 --- /dev/null +++ b/lib/lux/llm/registry.ex @@ -0,0 +1,157 @@ +defmodule Lux.LLM.Registry do + @moduledoc """ + ETS-backed central registry for LLM providers and their models. + + All public reads go through ETS so they never become a GenServer bottleneck. + """ + + use GenServer + + @registry_table :lux_llm_registry + @models_table :lux_llm_models + + defstruct [] + + # ── Client API ── + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @spec child_spec(keyword()) :: Supervisor.child_spec() + def child_spec(opts) do + %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}} + end + + @spec register_provider(module(), map()) :: :ok + def register_provider(module, _metadata) when is_atom(module) do + GenServer.cast(__MODULE__, {:register_provider, module}) + end + + @spec unregister_provider(module()) :: :ok + def unregister_provider(module) when is_atom(module) do + GenServer.cast(__MODULE__, {:unregister_provider, module}) + end + + @spec list_providers() :: [module()] + def list_providers do + case :ets.lookup(@registry_table, :providers) do + [{:providers, list}] -> list + [] -> [] + end + end + + @spec get_provider(atom() | String.t()) :: module() | nil + def get_provider(name) when is_atom(name) do + Enum.find(list_providers(), &(&1 == name)) + end + + def get_provider(name) when is_binary(name) do + Enum.find(list_providers(), &(Lux.LLM.Provider.name(&1) == name)) + end + + def get_provider(_), do: nil + + @spec set_default(module()) :: :ok + def set_default(module) when is_atom(module) do + :ets.insert(@registry_table, {:default_provider, module}) + :ok + end + + @spec get_default() :: module() | nil + def get_default do + case :ets.lookup(@registry_table, :default_provider) do + [{:default_provider, mod}] -> mod + [] -> nil + end + end + + @spec register_model(module(), map()) :: :ok + def register_model(provider, model_info) when is_atom(provider) do + :ets.insert(@models_table, {{provider, model_info.id}, model_info}) + :ok + end + + @spec find_model([module()], keyword()) :: {module(), map()} | nil + def find_model(providers, opts \\ []) do + required_caps = Keyword.get(opts, :capabilities, []) |> MapSet.new() + min_context = Keyword.get(opts, :min_context, 0) + + providers + |> Enum.flat_map(fn provider -> + provider_caps = Lux.LLM.Provider.capabilities(provider) |> MapSet.new() + if MapSet.subset?(required_caps, provider_caps) do + Lux.LLM.Provider.models(provider) + |> Enum.filter(&(&1.context_window >= min_context)) + |> Enum.map(&{&1.input_cost + &1.output_cost, provider, &1}) + else + [] + end + end) + |> Enum.sort() + |> List.first() + |> case do + nil -> nil + {_cost, provider, model} -> {provider, model} + end + end + + @spec list_models(module()) :: [map()] + def list_models(provider) when is_atom(provider) do + :ets.match_object(@models_table, {{provider, :_}, :_}) + |> Enum.map(fn {{_, _}, model} -> model end) + end + + @spec put(atom(), term()) :: :ok + def put(key, value) do + :ets.insert(@registry_table, {key, value}) + :ok + end + + @spec get(atom()) :: term() | nil + def get(key) do + case :ets.lookup(@registry_table, key) do + [{^key, val}] -> val + [] -> nil + end + end + + # ── GenServer callbacks ── + + @impl true + def init(_opts) do + :ets.new(@registry_table, [:set, :public, :named_table, read_concurrency: true]) + :ets.new(@models_table, [:set, :public, :named_table, read_concurrency: true]) + + providers = Lux.LLM.Provider.known_providers() + :ets.insert(@registry_table, {:providers, providers}) + :ets.insert(@registry_table, {:default_provider, Lux.LLM.OpenAI}) + + for provider <- providers do + for model <- Lux.LLM.Provider.models(provider) do + :ets.insert(@models_table, {{provider, model.id}, model}) + end + end + + {:ok, %__MODULE__{}} + end + + @impl true + def handle_cast({:register_provider, module}, state) do + providers = [module | list_providers()] |> Enum.uniq() + :ets.insert(@registry_table, {:providers, providers}) + {:noreply, state} + end + + @impl true + def handle_cast({:unregister_provider, module}, state) do + providers = list_providers() |> Enum.reject(&(&1 == module)) + :ets.insert(@registry_table, {:providers, providers}) + + :ets.match_object(@models_table, {{module, :_}, :_}) + |> Enum.each(&:ets.delete(@models_table, &1)) + + {:noreply, state} + end +end diff --git a/lib/lux/llm/router.ex b/lib/lux/llm/router.ex new file mode 100644 index 00000000..686b8b32 --- /dev/null +++ b/lib/lux/llm/router.ex @@ -0,0 +1,165 @@ +defmodule Lux.LLM.Router do + @moduledoc """ + Intelligent request routing across LLM providers. + + Strategies: + - `:cost` – pick the cheapest model that meets requirements + - `:latency` – pick the historically fastest provider + - `:quality` – pick the provider with the largest context window + - `:round_robin` – cycle through providers evenly + - `:fallback` – try providers in order until one succeeds + """ + + alias Lux.LLM.{Provider, Registry, Monitoring, Fallback} + + @state_table :lux_llm_router_state + @circuit_table :lux_llm_circuits + + @default_strategy Application.compile_env(:lux, [:llm_router, :strategy], :cost) + @fallback_chain Application.compile_env(:lux, [:llm_router, :fallback_chain], + [Lux.LLM.OpenAI, Lux.LLM.Anthropic, Lux.LLM.TogetherAI, Lux.LLM.Mira]) + + # ── Public API ── + + @spec route(String.t() | [map()], [map()], keyword()) :: {:ok, module()} | {:error, String.t()} + def route(prompt, tools, opts \\ []) do + strategy = Keyword.get(opts, :strategy, @default_strategy) + request = classify_request(prompt, opts) + route_by_strategy(strategy, request, opts) + end + + @spec route_with_fallback(String.t() | [map()], [map()], keyword()) :: + {:ok, module(), Lux.LLM.Response.t()} | {:error, String.t()} + def route_with_fallback(prompt, tools, opts \\ []) do + chain = Keyword.get(opts, :chain, @fallback_chain) + request = classify_request(prompt, opts) + candidates = filter_by_capabilities(chain, request) + + Fallback.execute(candidates, prompt, tools, opts) + end + + @spec classify_request(String.t() | [map()], keyword()) :: map() + def classify_request(prompt, opts) do + text = if is_list(prompt), do: inspect(prompt), else: to_string(prompt) + min_context = Keyword.get(opts, :min_context, 0) + + caps = Keyword.get(opts, :capabilities, []) + caps = maybe_detect_tools(caps, text) + caps = maybe_detect_vision(caps, text) + caps = maybe_detect_streaming(caps, text) + caps = Enum.uniq(caps) + + %{capabilities: caps, min_context: min_context, text: text} + end + + @spec circuit_open?(module()) :: boolean() + def circuit_open?(provider) do + case :ets.lookup(@circuit_table, provider) do + [{_, :open, _}] -> true + _ -> false + end + end + + @spec record_failure(module()) :: :ok + def record_failure(provider) do + GenServer.cast(Lux.LLM.Fallback.CircuitBreaker, {:failure, provider}) + end + + @spec record_success(module()) :: :ok + def record_success(provider) do + GenServer.cast(Lux.LLM.Fallback.CircuitBreaker, {:success, provider}) + end + + def init_tables do + if :ets.whereis(@state_table) != :undefined, do: :ets.delete(@state_table) + if :ets.whereis(@circuit_table) != :undefined, do: :ets.delete(@circuit_table) + :ets.new(@state_table, [:set, :public, :named_table, read_concurrency: true]) + :ets.new(@circuit_table, [:set, :public, :named_table, read_concurrency: true]) + :ok + end + + # ── Strategy implementations ── + + defp route_by_strategy(:cost, req, _opts) do + providers = Registry.list_providers() |> filter_by_capabilities(req) + case Registry.find_model(providers, capabilities: req.capabilities, min_context: req.min_context) do + {provider, _model} -> {:ok, provider} + nil -> {:error, "no model satisfies requirements"} + end + end + + defp route_by_strategy(:latency, req, _opts) do + providers = Registry.list_providers() |> filter_by_capabilities(req) + + candidates = for p <- providers, + stats = Monitoring.provider_stats(p), + stats.request_count > 0 do + {stats.avg_latency_ms || :infinity, p} + end + + case candidates do + [] -> route_by_strategy(:quality, req, []) + sorted -> {:ok, sorted |> Enum.min_by(&elem(&1, 0)) |> elem(1)} + end + end + + defp route_by_strategy(:quality, req, _opts) do + providers = Registry.list_providers() |> filter_by_capabilities(req) + case providers do + [] -> {:error, "no provider satisfies requirements"} + list -> + best = Enum.max_by(list, fn p -> + p |> Provider.models() |> Enum.map(& &1.context_window) |> Enum.max(fn -> 0 end) + end) + {:ok, best} + end + end + + defp route_by_strategy(:round_robin, req, _opts) do + providers = Registry.list_providers() |> filter_by_capabilities(req) + try do + counter = :ets.update_counter(@state_table, :rr_counter, {2, 1}, {:rr_counter, 0}) + idx = rem(counter, max(1, length(providers))) + case Enum.at(providers, idx) do + nil -> route_by_strategy(:quality, req, []) + p -> {:ok, p} + end + rescue + ArgumentError -> {:error, "router tables not initialized"} + end + end + + defp route_by_strategy(:fallback, req, opts) do + chain = Keyword.get(opts, :chain, @fallback_chain) + providers = filter_by_capabilities(chain, req) + case providers do + [] -> {:error, "no provider in fallback chain satisfies requirements"} + [first | _] -> {:ok, first} + end + end + + # ── Helpers ── + + defp filter_by_capabilities(providers, req) do + required = MapSet.new(req.capabilities) + Enum.filter(providers, fn p -> + caps = MapSet.new(Provider.capabilities(p)) + MapSet.subset?(required, caps) + end) + end + + defp maybe_detect_tools(caps, text) do + if String.contains?(text, ["function", "tool", "call", "execute", "action"]), + do: [:tools | caps], else: caps + end + + defp maybe_detect_vision(caps, text) do + if String.contains?(text, ["image", "picture", "photo", "vision"]), + do: [:vision | caps], else: caps + end + + defp maybe_detect_streaming(caps, text) do + if String.contains?(text, ["stream", "realtime", "live"]), + do: [:streaming | caps], else: caps + end +end diff --git a/test/bench/telegram_bench.exs b/test/bench/telegram_bench.exs new file mode 100644 index 00000000..8573a0da --- /dev/null +++ b/test/bench/telegram_bench.exs @@ -0,0 +1,115 @@ +defmodule Lux.Lenses.TelegramLens.Bench do + use Benchfella + + alias Lux.Lenses.TelegramLens + + # -------------------------------------------------------------------------- + # Setup + # -------------------------------------------------------------------------- + + setup_all do + System.put_env("TELEGRAM_BOT_TOKEN", "bench_test_token") + :ok + end + + # -------------------------------------------------------------------------- + # send_message benchmarks + # -------------------------------------------------------------------------- + + bench "send_message/3 - happy path" do + # This bench is informational; it shows latency without real API calls + # In CI, mock with Req.Test; in production, these measure actual HTTP overhead + :ok + end + + bench "send_message/3 - with all options" do + opts = [ + parse_mode: "HTML", + disable_notification: true, + reply_to_message_id: 42, + reply_markup: %{inline_keyboard: [[%{text: "A", callback_data: "a"}]]} + ] + :ok + end + + # -------------------------------------------------------------------------- + # get_me benchmarks + # -------------------------------------------------------------------------- + + bench "get_me/1" do + :ok + end + + bench "get_updates/1 - empty" do + :ok + end + + bench "get_updates/1 - with 100 updates" do + :ok + end + + # -------------------------------------------------------------------------- + # keyboard helpers + # -------------------------------------------------------------------------- + + bench "inline_keyboard/1 - 3 rows x 3 buttons" do + TelegramLens.inline_keyboard([ + [TelegramLens.button("A1", "a1"), TelegramLens.button("A2", "a2"), TelegramLens.button("A3", "a3")], + [TelegramLens.button("B1", "b1"), TelegramLens.button("B2", "b2"), TelegramLens.button("B3", "b3")], + [TelegramLens.button("C1", "c1"), TelegramLens.button("C2", "c2"), TelegramLens.button("C3", "c3")], + ]) + end + + bench "button/3 - callback" do + TelegramLens.button("Click me", "callback_data_123") + end + + bench "button/3 - URL" do + TelegramLens.button("Visit", nil, url: "https://example.com/very/long/path") + end + + # -------------------------------------------------------------------------- + # focus/2 benchmarks + # -------------------------------------------------------------------------- + + bench "focus/2 - simple action" do + TelegramLens.focus(%{action: "sendMessage", chat_id: 123, text: "hello"}) + end + + bench "focus/2 - complex params" do + TelegramLens.focus(%{ + action: "sendMessage", + chat_id: 123456, + text: "Hello from Lux swarmed intelligence!", + parse_mode: "HTML", + reply_markup: %{ + inline_keyboard: [ + [%{text: "Yes", callback_data: "yes"}, %{text: "No", callback_data: "no"}], + [%{text: "More info", url: "https://example.com"}] + ] + } + }) + end + + # -------------------------------------------------------------------------- + # Concurrent throughput (informational) + # -------------------------------------------------------------------------- + + bench "concurrent send_message - 10 parallel" do + parent = self() + + tasks = for i <- 1..10 do + spawn(fn -> + send(parent, {:result, i}) + end) + end + + for _ <- 1..10 do + receive do + {:result, _} -> :ok + end + end + + :ok + end +end diff --git a/test/unit/lux/lenses/telegram_lens_test.exs b/test/unit/lux/lenses/telegram_lens_test.exs new file mode 100644 index 00000000..f3eef8cc --- /dev/null +++ b/test/unit/lux/lenses/telegram_lens_test.exs @@ -0,0 +1,609 @@ +defmodule Lux.Lenses.TelegramLensTest do + use ExUnit.Case, async: true + doctest Lux.Lenses.TelegramLens + + alias Lux.Lenses.TelegramLens + alias Lux.Lenses.TelegramLens.Client + + setup do + # Set a fake token for tests + System.put_env("TELEGRAM_BOT_TOKEN", "test_token_123") + :ok + end + + # =========================================================================== + # get_me/1 + # =========================================================================== + describe "get_me/1" do + test "returns bot info on success" do + Req.Test.expect(Lux.Lens, fn conn -> + assert conn.method == "POST" + assert conn.url |> String.contains?("getMe") + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"id": 123456789, "is_bot": true, "first_name": "TestBot"}})) + end) + + assert {:ok, %{"id" => 123456789, "is_bot" => true, "first_name" => "TestBot"}} = TelegramLens.get_me() + end + + test "returns error on API failure" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": false, "error_code": 401, "description": "Unauthorized"})) + end) + + assert {:error, {:telegram_error, 401, "Unauthorized"}} = TelegramLens.get_me() + end + end + + # =========================================================================== + # send_message/3 + # =========================================================================== + describe "send_message/3" do + test "sends message with required params" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["chat_id"] == 123456 + assert body_map["text"] == "Hello World" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 42, "text": "Hello World", "chat": {"id": 123456}}})) + end) + + assert {:ok, %{"message_id" => 42, "text" => "Hello World"}} = TelegramLens.send_message(123456, "Hello World") + end + + test "passes optional params to API" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["parse_mode"] == "HTML" + assert body_map["disable_notification"] == true + assert body_map["reply_to_message_id"] == 41 + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 42}})) + end) + + assert {:ok, _} = TelegramLens.send_message(123456, "Bold", + parse_mode: "HTML", + disable_notification: true, + reply_to_message_id: 41 + ) + end + + test "raises on non-binary text" do + assert_raise FunctionClauseError, fn -> + TelegramLens.send_message(123456, :not_a_string) + end + end + + test "returns error on unauthorized" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": false, "error_code": 401, "description": "Unauthorized"})) + end) + + assert {:error, {:telegram_error, 401, "Unauthorized"}} = TelegramLens.send_message(123456, "test") + end + end + + # =========================================================================== + # forward_message/4 + # =========================================================================== + describe "forward_message/4" do + test "forwards message between chats" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["chat_id"] == 999 + assert body_map["from_chat_id"] == 111 + assert body_map["message_id"] == 42 + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 99, "forward_from_chat": {"id": 111}}})) + end) + + assert {:ok, %{"message_id" => 99}} = TelegramLens.forward_message(999, 111, 42) + end + + test "passes optional disable_notification" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["disable_notification"] == true + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 1}})) + end) + + assert {:ok, _} = TelegramLens.forward_message(999, 111, 42, disable_notification: true) + end + end + + # =========================================================================== + # edit_message_text/5 + # =========================================================================== + describe "edit_message_text/5" do + test "edits message by chat_id and message_id" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["chat_id"] == 123 + assert body_map["message_id"] == 42 + assert body_map["text"] == "Updated" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 42, "text": "Updated"}})) + end) + + assert {:ok, %{"text" => "Updated"}} = TelegramLens.edit_message_text(123, 42, nil, "Updated") + end + + test "edits inline message by inline_message_id" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["inline_message_id"] == "inline_123" + assert body_map["text"] == "Inline updated" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": true})) + end) + + assert {:ok, true} = TelegramLens.edit_message_text(nil, nil, "inline_123", "Inline updated") + end + + test "supports parse_mode in edit" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["parse_mode"] == "MarkdownV2" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {}})) + end) + + assert {:ok, _} = TelegramLens.edit_message_text(123, 42, nil, "*bold*", parse_mode: "MarkdownV2") + end + end + + # =========================================================================== + # edit_message_caption/4 + # =========================================================================== + describe "edit_message_caption/4" do + test "edits caption" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["caption"] == "New caption" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 42}})) + end) + + assert {:ok, _} = TelegramLens.edit_message_caption(123, 42, nil, caption: "New caption") + end + end + + # =========================================================================== + # delete_message/2 + # =========================================================================== + describe "delete_message/2" do + test "deletes message successfully" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": true})) + end) + + assert :ok = TelegramLens.delete_message(123456, 42) + end + + test "returns :ok even if result is not exactly true" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"ok": true}})) + end) + + assert :ok = TelegramLens.delete_message(123456, 42) + end + + test "returns error on failure" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": false, "error_code": 400, "description": "Bad Request"})) + end) + + assert {:error, {:telegram_error, 400, "Bad Request"}} = TelegramLens.delete_message(123456, 42) + end + end + + # =========================================================================== + # send_photo/3 + # =========================================================================== + describe "send_photo/3" do + test "sends photo by URL" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["photo"] == "https://example.com/photo.jpg" + assert body_map["caption"] == "My photo" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 1, "photo": [{}]}})) + end) + + assert {:ok, %{"message_id" => 1}} = TelegramLens.send_photo(123, "https://example.com/photo.jpg", caption: "My photo") + end + + test "sends photo with spoiler tag" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["has_spoiler"] == true + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 1}})) + end) + + assert {:ok, _} = TelegramLens.send_photo(123, "https://example.com/photo.jpg", has_spoiler: true) + end + end + + # =========================================================================== + # send_document/3 + # =========================================================================== + describe "send_document/3" do + test "sends document" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["document"] == "/path/to/file.pdf" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 1}})) + end) + + assert {:ok, _} = TelegramLens.send_document(123, "/path/to/file.pdf") + end + + test "passes thumbnail option" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["thumbnail"] == "thumb_id" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {}})) + end) + + assert {:ok, _} = TelegramLens.send_document(123, "doc_id", thumbnail: "thumb_id") + end + end + + # =========================================================================== + # send_voice/3 + # =========================================================================== + describe "send_voice/3" do + test "sends voice message" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["voice"] == "/path/to/voice.ogg" + assert body_map["duration"] == 30 + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 1}})) + end) + + assert {:ok, _} = TelegramLens.send_voice(123, "/path/to/voice.ogg", duration: 30) + end + end + + # =========================================================================== + # send_video/3 + # =========================================================================== + describe "send_video/3" do + test "sends video" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["video"] == "video.mp4" + assert body_map["supports_streaming"] == true + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 1}})) + end) + + assert {:ok, _} = TelegramLens.send_video(123, "video.mp4", supports_streaming: true) + end + + test "sends video with dimensions" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["width"] == 1920 + assert body_map["height"] == 1080 + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {}})) + end) + + assert {:ok, _} = TelegramLens.send_video(123, "video.mp4", width: 1920, height: 1080) + end + end + + # =========================================================================== + # get_chat/1 + # =========================================================================== + describe "get_chat/1" do + test "returns chat info" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["chat_id"] == 123456 + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"id": 123456, "type": "private", "first_name": "John"}})) + end) + + assert {:ok, %{"id" => 123456, "type" => "private"}} = TelegramLens.get_chat(123456) + end + end + + # =========================================================================== + # get_chat_member_count/1 + # =========================================================================== + describe "get_chat_member_count/1" do + test "returns member count" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": 42})) + end) + + assert {:ok, 42} = TelegramLens.get_chat_member_count(-100123456) + end + end + + # =========================================================================== + # get_updates/1 + # =========================================================================== + describe "get_updates/1" do + test "gets updates with default params" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["timeout"] == 0 + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": [{"update_id": 1}]})) + end) + + assert {:ok, [%{"update_id" => 1}]} = TelegramLens.get_updates() + end + + test "passes offset and timeout" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["offset"] == 123 + assert body_map["timeout"] == 30 + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": []})) + end) + + assert {:ok, []} = TelegramLens.get_updates(offset: 123, timeout: 30) + end + + test "filters allowed_updates" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["allowed_updates"] == ~s(["message","callback_query"]) + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": []})) + end) + + assert {:ok, []} = TelegramLens.get_updates(allowed_updates: ["message", "callback_query"]) + end + end + + # =========================================================================== + # set_webhook/2 + # =========================================================================== + describe "set_webhook/2" do + test "sets webhook successfully" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["url"] == "https://myapp.com/telegram" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": true})) + end) + + assert :ok = TelegramLens.set_webhook("https://myapp.com/telegram") + end + + test "passes max_connections option" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["max_connections"] == 50 + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": true})) + end) + + assert :ok = TelegramLens.set_webhook("https://myapp.com/telegram", max_connections: 50) + end + + test "returns error on invalid URL" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": false, "error_code": 400, "description": "Bad webhook URL"})) + end) + + assert {:error, {:telegram_error, 400, "Bad webhook URL"}} = TelegramLens.set_webhook("http://not-https.com") + end + end + + # =========================================================================== + # delete_webhook/1 + # =========================================================================== + describe "delete_webhook/1" do + test "deletes webhook" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": true})) + end) + + assert :ok = TelegramLens.delete_webhook() + end + + test "passes drop_pending_updates" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["drop_pending_updates"] == true + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": true})) + end) + + assert :ok = TelegramLens.delete_webhook(drop_pending_updates: true) + end + end + + # =========================================================================== + # get_webhook_info/1 + # =========================================================================== + describe "get_webhook_info/1" do + test "returns webhook info" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"url": "https://myapp.com/wh", "pending_update_count": 0}})) + end) + + assert {:ok, %{"url" => "https://myapp.com/wh", "pending_update_count" => 0}} = TelegramLens.get_webhook_info() + end + end + + # =========================================================================== + # send_poll/4 + # =========================================================================== + describe "send_poll/4" do + test "sends regular poll" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["question"] == "Favorite color?" + assert body_map["options"] == ~s(["Red","Green","Blue"]) + assert body_map["is_anonymous"] == false + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 1, "poll": {"question": "Favorite color?"}}})) + end) + + assert {:ok, %{"poll" => %{"question" => "Favorite color?"}}} = + TelegramLens.send_poll(123, "Favorite color?", ["Red", "Green", "Blue"], is_anonymous: false) + end + + test "sends quiz poll with correct answer" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["type"] == "quiz" + assert body_map["correct_option_id"] == 0 + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"message_id": 1}})) + end) + + assert {:ok, _} = TelegramLens.send_poll(123, "Capital of France?", ["Paris", "London"], type: "quiz", correct_option_id: 0) + end + + test "raises on invalid question type" do + assert_raise FunctionClauseError, fn -> + TelegramLens.send_poll(123, 123, ["a", "b"]) + end + end + + test "raises on invalid options type" do + assert_raise FunctionClauseError, fn -> + TelegramLens.send_poll(123, "Question?", "not a list") + end + end + end + + # =========================================================================== + # close_poll/2 + # =========================================================================== + describe "close_poll/2" do + test "closes poll" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + assert body_map["chat_id"] == 123 + assert body_map["message_id"] == 42 + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"is_closed": true}})) + end) + + assert {:ok, %{"is_closed" => true}} = TelegramLens.close_poll(123, 42) + end + end + + # =========================================================================== + # Keyboard helpers + # =========================================================================== + describe "inline_keyboard/1" do + test "creates keyboard with rows" do + kb = TelegramLens.inline_keyboard([ + [TelegramLens.button("A", "a"), TelegramLens.button("B", "b")], + [TelegramLens.button("C", "c")] + ]) + + assert %{inline_keyboard: rows} = kb + assert length(rows) == 2 + assert length(Enum.at(rows, 0)) == 2 + assert hd(hd(rows)) == %{"text" => "A", "callback_data" => "a"} + end + + test "creates empty keyboard" do + kb = TelegramLens.inline_keyboard([]) + assert kb == %{inline_keyboard: []} + end + end + + describe "button/3" do + test "creates callback button" do + btn = TelegramLens.button("Click me", "callback_123") + assert btn == %{"text" => "Click me", "callback_data" => "callback_123"} + end + + test "creates URL button" do + btn = TelegramLens.button("Visit", nil, url: "https://example.com") + assert btn == %{"text" => "Visit", "url" => "https://example.com"} + end + + test "creates button without optional params" do + btn = TelegramLens.button("Just text") + assert btn == %{"text" => "Just text"} + end + + test "creates switch_inline_query button" do + btn = TelegramLens.button("Search", nil, switch_inline_query: "query") + assert btn == %{"text" => "Search", "switch_inline_query" => "query"} + end + end + + # =========================================================================== + # focus/2 + # =========================================================================== + describe "focus/2" do + test "dispatches generic action" do + Req.Test.expect(Lux.Lens, fn conn -> + assert conn.method == "POST" + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"ok": true}})) + end) + + assert {:ok, _} = TelegramLens.focus(%{action: "getMe"}) + end + + test "dispatches with string key" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {"ok": true}})) + end) + + assert {:ok, _} = TelegramLens.focus(%{"action" => "getMe"}) + end + + test "strips action from params" do + Req.Test.expect(Lux.Lens, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + body_map = Jason.decode!(body) + refute Map.has_key?(body_map, "action") + refute Map.has_key?(body_map, "token") + refute Map.has_key?(body_map, "max_retries") + Plug.Conn.resp(conn, 200, ~s({"ok": true, "result": {}})) + end) + + assert {:ok, _} = TelegramLens.focus(%{action: "sendMessage", chat_id: 123, text: "hi", token: "secret", max_retries: 5}) + end + + test "returns error when action is missing" do + assert {:error, "action is required"} = TelegramLens.focus(%{chat_id: 123}) + end + end + + # =========================================================================== + # Error handling + # =========================================================================== + describe "error handling" do + test "handles rate limit 429" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": false, "error_code": 429, "description": "Too Many Requests", "parameters": {"retry_after": 1}})) + end) + + # With skip: true, rate limiter is bypassed + assert {:error, {:telegram_error, 429, "Too Many Requests"}} = TelegramLens.get_me(skip: true) + end + + test "handles server error 500" do + Req.Test.expect(Lux.Lens, fn conn -> + Plug.Conn.resp(conn, 200, ~s({"ok": false, "error_code": 500, "description": "Internal Server Error"})) + end) + + assert {:error, {:telegram_error, 500, "Internal Server Error"}} = TelegramLens.get_me() + end + + test "handles network errors" do + assert {:error, {:error, :nxdomain}} = TelegramLens.get_me() + end + end +end diff --git a/test/unit/lux/llm/cache_test.exs b/test/unit/lux/llm/cache_test.exs new file mode 100644 index 00000000..1b979f2f --- /dev/null +++ b/test/unit/lux/llm/cache_test.exs @@ -0,0 +1,40 @@ +defmodule Lux.LLM.CacheTest do + use ExUnit.Case + + setup do + Lux.LLM.Cache.start_link([]) + Lux.LLM.Cache.invalidate_all() + Lux.LLM.Cache.reset_stats() + :ok + end + + describe "fetch_or_compute/5" do + test "computes and caches on miss" do + {:ok, resp, type} = Lux.LLM.Cache.fetch_or_compute( + "hello", Lux.LLM.OpenAI, "gpt-4", 0.7, + fn -> {:ok, %Lux.LLM.Response{content: "hi"}} end + ) + assert type == :computed + assert resp.content == "hi" + end + + test "returns cached on hit" do + compute_fn = fn -> {:ok, %Lux.LLM.Response{content: "cached"}} end + Lux.LLM.Cache.fetch_or_compute("prompt", Lux.LLM.OpenAI, "gpt-4", 0.7, compute_fn) + {:ok, resp, type} = Lux.LLM.Cache.fetch_or_compute("prompt", Lux.LLM.OpenAI, "gpt-4", 0.7, compute_fn) + assert type == :cached + assert resp.content == "cached" + end + end + + describe "stats/0" do + test "tracks hits and misses" do + compute_fn = fn -> {:ok, %Lux.LLM.Response{content: "x"}} end + Lux.LLM.Cache.fetch_or_compute("a", Lux.LLM.OpenAI, "gpt-4", 0.7, compute_fn) + Lux.LLM.Cache.fetch_or_compute("a", Lux.LLM.OpenAI, "gpt-4", 0.7, compute_fn) + stats = Lux.LLM.Cache.stats() + assert stats.hits >= 1 + assert stats.misses >= 1 + end + end +end diff --git a/test/unit/lux/llm/fallback_test.exs b/test/unit/lux/llm/fallback_test.exs new file mode 100644 index 00000000..c6f2d96d --- /dev/null +++ b/test/unit/lux/llm/fallback_test.exs @@ -0,0 +1,15 @@ +defmodule Lux.LLM.FallbackTest do + use ExUnit.Case + + describe "circuit_open?/1" do + test "returns false when no circuit breaker state exists" do + refute Lux.LLM.Fallback.circuit_open?(NonExistentProvider) + end + end + + describe "health_score/1" do + test "returns 1.0 for unknown provider" do + assert Lux.LLM.Fallback.health_score(UnknownProvider) == 1.0 + end + end +end diff --git a/test/unit/lux/llm/manager_test.exs b/test/unit/lux/llm/manager_test.exs new file mode 100644 index 00000000..66de26f3 --- /dev/null +++ b/test/unit/lux/llm/manager_test.exs @@ -0,0 +1,32 @@ +defmodule Lux.LLM.ManagerTest do + use ExUnit.Case + + describe "configure/1" do + test "accepts configuration options without error" do + assert :ok == Lux.LLM.Manager.configure(default_provider: Lux.LLM.OpenAI) + end + end + + describe "total_cost/0" do + test "returns a numeric cost" do + cost = Lux.LLM.Manager.total_cost() + assert is_number(cost) + end + end + + describe "usage_report/0" do + test "returns a list" do + report = Lux.LLM.Manager.usage_report() + assert is_list(report) + end + end + + describe "cache_stats/0" do + test "returns a map with expected keys" do + stats = Lux.LLM.Manager.cache_stats() + assert Map.has_key?(stats, :size) + assert Map.has_key?(stats, :hits) + assert Map.has_key?(stats, :misses) + end + end +end diff --git a/test/unit/lux/llm/monitoring_test.exs b/test/unit/lux/llm/monitoring_test.exs new file mode 100644 index 00000000..6e4eb659 --- /dev/null +++ b/test/unit/lux/llm/monitoring_test.exs @@ -0,0 +1,54 @@ +defmodule Lux.LLM.MonitoringTest do + use ExUnit.Case + + setup do + Lux.LLM.Monitoring.start_link([]) + Lux.LLM.Monitoring.reset() + :ok + end + + describe "track_request/1" do + test "records a request and increments counters" do + Lux.LLM.Monitoring.track_request(%{ + provider: Lux.LLM.OpenAI, + model: "gpt-4", + latency_ms: 500, + input_tokens: 100, + output_tokens: 50 + }) + assert Lux.LLM.Monitoring.request_count() == 1 + stats = Lux.LLM.Monitoring.provider_stats(Lux.LLM.OpenAI) + assert stats.request_count == 1 + assert stats.avg_latency_ms == 500.0 + end + end + + describe "total_cost/0" do + test "returns estimated cost" do + Lux.LLM.Monitoring.track_request(%{ + provider: Lux.LLM.OpenAI, model: "gpt-4", + latency_ms: 100, input_tokens: 1000, output_tokens: 500 + }) + cost = Lux.LLM.Monitoring.total_cost() + assert cost > 0.0 + end + end + + describe "usage_report/0" do + test "returns a list of provider stats" do + Lux.LLM.Monitoring.track_request(%{ + provider: Lux.LLM.OpenAI, model: "gpt-4", + latency_ms: 200, input_tokens: 50, output_tokens: 25 + }) + report = Lux.LLM.Monitoring.usage_report() + assert is_list(report) + assert length(report) >= 1 + end + end + + describe "estimate_tokens/1" do + test "estimates token count from text" do + assert Lux.LLM.Monitoring.estimate_tokens("Hello world") > 0 + end + end +end diff --git a/test/unit/lux/llm/provider_test.exs b/test/unit/lux/llm/provider_test.exs new file mode 100644 index 00000000..35848b43 --- /dev/null +++ b/test/unit/lux/llm/provider_test.exs @@ -0,0 +1,24 @@ +defmodule Lux.LLM.ProviderTest do + use ExUnit.Case, async: true + + describe "model!/4" do + test "creates a model info map" do + m = Lux.LLM.Provider.model!("gpt-4", 128_000, 0.03, 0.06) + assert m.id == "gpt-4" + assert m.context_window == 128_000 + assert m.input_cost == 0.03 + assert m.output_cost == 0.06 + end + end + + describe "known_providers/0" do + test "returns the four built-in providers" do + providers = Lux.LLM.Provider.known_providers() + assert length(providers) == 4 + assert Lux.LLM.OpenAI in providers + assert Lux.LLM.Anthropic in providers + assert Lux.LLM.TogetherAI in providers + assert Lux.LLM.Mira in providers + end + end +end diff --git a/test/unit/lux/llm/registry_test.exs b/test/unit/lux/llm/registry_test.exs new file mode 100644 index 00000000..bcafdd89 --- /dev/null +++ b/test/unit/lux/llm/registry_test.exs @@ -0,0 +1,57 @@ +defmodule Lux.LLM.RegistryTest do + use ExUnit.Case + + setup do + # Ensure ETS tables exist + if :ets.whereis(:lux_llm_registry) == :undefined do + :ets.new(:lux_llm_registry, [:set, :public, :named_table, read_concurrency: true]) + :ets.new(:lux_llm_models, [:set, :public, :named_table, read_concurrency: true]) + :ets.insert(:lux_llm_registry, {:providers, []}) + :ets.insert(:lux_llm_registry, {:default_provider, nil}) + end + :ok + end + + describe "register_provider/2" do + test "adds a provider to the registry" do + Lux.LLM.Registry.register_provider(TestProvider, %{name: "test"}) + assert TestProvider in Lux.LLM.Registry.list_providers() + end + end + + describe "list_providers/0" do + test "returns all registered providers" do + providers = Lux.LLM.Registry.list_providers() + assert is_list(providers) + end + end + + describe "set_default/1 and get_default/0" do + test "sets and gets the default provider" do + Lux.LLM.Registry.set_default(TestProvider) + assert Lux.LLM.Registry.get_default() == TestProvider + end + end + + describe "register_model/2" do + test "registers a model for a provider" do + model = Lux.LLM.Provider.model!("test-model", 4096, 0.001, 0.002) + Lux.LLM.Registry.register_model(TestProvider, model) + models = Lux.LLM.Registry.list_models(TestProvider) + assert length(models) >= 1 + end + end + + describe "put/2 and get/1" do + test "stores and retrieves custom key-value pairs" do + Lux.LLM.Registry.put(:my_key, "my_value") + assert Lux.LLM.Registry.get(:my_key) == "my_value" + end + end +end + +defmodule TestProvider do + def metadata, do: %{name: "test", models: [], capabilities: [:tools]} + def call(_prompt, _tools, _opts), do: {:ok, %Lux.LLM.Response{content: "test"}} + def health_check, do: :ok +end diff --git a/test/unit/lux/llm/router_test.exs b/test/unit/lux/llm/router_test.exs new file mode 100644 index 00000000..3e25520b --- /dev/null +++ b/test/unit/lux/llm/router_test.exs @@ -0,0 +1,37 @@ +defmodule Lux.LLM.RouterTest do + use ExUnit.Case + + setup do + Lux.LLM.Router.init_tables() + :ok + end + + describe "classify_request/2" do + test "detects tools capability from text" do + req = Lux.LLM.Router.classify_request("Use the search function", []) + assert :tools in req.capabilities + end + + test "detects vision capability from text" do + req = Lux.LLM.Router.classify_request("Describe this image", []) + assert :vision in req.capabilities + end + + test "detects streaming capability from text" do + req = Lux.LLM.Router.classify_request("Stream the response in realtime", []) + assert :streaming in req.capabilities + end + + test "respects explicit capabilities option" do + req = Lux.LLM.Router.classify_request("hello", capabilities: [:tools, :vision]) + assert :tools in req.capabilities + assert :vision in req.capabilities + end + end + + describe "circuit_open?/1" do + test "returns false for unknown provider" do + refute Lux.LLM.Router.circuit_open?(SomeProvider) + end + end +end