Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions app/lib/message_dequeuer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,39 @@ module MessageDequeuer

class << self

MAX_DEADLOCK_RETRIES = 3
DEADLOCK_RETRY_DELAY = 0.1 # seconds

def process(message, logger:)
processor = InitialProcessor.new(message, logger: logger)
processor.process
retries = 0
begin
processor = InitialProcessor.new(message, logger: logger)
processor.process
rescue ActiveRecord::Deadlocked => e
retries += 1
if retries <= MAX_DEADLOCK_RETRIES
# Exponential backoff with jitter to reduce contention
sleep_time = (DEADLOCK_RETRY_DELAY * (2**(retries - 1))) + rand(0.0..0.05)
logger.warn "Deadlock detected (attempt #{retries}/#{MAX_DEADLOCK_RETRIES}), retrying in #{sleep_time.round(3)}s",
message_id: message.id,
error: e.message
deadlock_sleep(sleep_time)
retry
else
logger.error "Deadlock persisted after #{MAX_DEADLOCK_RETRIES} retries, requeueing message",
message_id: message.id,
error: e.message
# Requeue the message for later processing
message.retry_later unless message.destroyed?
raise
end
end
end

private

def deadlock_sleep(time)
sleep(time)
end

end
Expand Down
49 changes: 40 additions & 9 deletions app/lib/message_dequeuer/outgoing_message_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,26 @@ def throttle_all_queued_messages_for_domain(domain, throttled_until)
retry_after = throttled_until + 10.seconds

# Update all queued messages for this domain that don't already have a later retry_after
updated_count = QueuedMessage.where(server_id: queued_message.server_id, domain: domain)
.where("retry_after IS NULL OR retry_after < ?", retry_after)
.where.not(id: queued_message.id)
.update_all(retry_after: retry_after)
# Use a smaller batch size and SKIP LOCKED to reduce lock contention
updated_count = 0
begin
batch_size = 100
loop do
# Update in batches to reduce lock contention
count = QueuedMessage.where(server_id: queued_message.server_id, domain: domain)
.where("retry_after IS NULL OR retry_after < ?", retry_after)
.where.not(id: queued_message.id)
.limit(batch_size)
.update_all(retry_after: retry_after)
updated_count += count
break if count < batch_size
end
rescue ActiveRecord::Deadlocked => e
log "deadlock during domain throttle batch update, partial update completed",
domain: domain,
updated_count: updated_count,
error: e.message
end

return unless updated_count > 0

Expand Down Expand Up @@ -411,11 +427,26 @@ def requeue_messages_for_mx(mx_domain, delay_seconds)
retry_after = Time.current + delay_seconds.seconds + 10.seconds

# Update all queued messages for this MX domain
updated_count = QueuedMessage
.where(server_id: queued_message.server_id, mx_domain: mx_domain)
.where("retry_after IS NULL OR retry_after < ?", retry_after)
.where.not(id: queued_message.id)
.update_all(retry_after: retry_after)
# Use a smaller batch size to reduce lock contention
updated_count = 0
begin
batch_size = 100
loop do
count = QueuedMessage
.where(server_id: queued_message.server_id, mx_domain: mx_domain)
.where("retry_after IS NULL OR retry_after < ?", retry_after)
.where.not(id: queued_message.id)
.limit(batch_size)
.update_all(retry_after: retry_after)
updated_count += count
break if count < batch_size
end
rescue ActiveRecord::Deadlocked => e
log "deadlock during MX rate limit batch update, partial update completed",
mx_domain: mx_domain,
updated_count: updated_count,
error: e.message
end

return unless updated_count > 0

Expand Down
29 changes: 23 additions & 6 deletions app/models/queued_message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
#
# Indexes
#
# index_queued_messages_on_domain (domain)
# index_queued_messages_on_message_id (message_id)
# index_queued_messages_on_mx_domain (mx_domain)
# index_queued_messages_on_server_id (server_id)
# index_queued_messages_on_batch_lock (batch_key,ip_address_id,locked_by,locked_at)
# index_queued_messages_on_domain (domain)
# index_queued_messages_on_lock_and_retry (locked_by,locked_at,retry_after,ip_address_id)
# index_queued_messages_on_message_id (message_id)
# index_queued_messages_on_mx_domain (mx_domain)
# index_queued_messages_on_server_domain_retry (server_id,domain,retry_after)
# index_queued_messages_on_server_id (server_id)
# index_queued_messages_on_server_mx_retry (server_id,mx_domain,retry_after)
#

class QueuedMessage < ApplicationRecord
Expand Down Expand Up @@ -114,8 +118,21 @@ def batchable_messages(limit = 10)
else
time = Time.now
locker = Postal.locker_name
self.class.ready.where(batch_key: batch_key, ip_address_id: ip_address_id, locked_by: nil, locked_at: nil).limit(limit).update_all(locked_by: locker, locked_at: time)
QueuedMessage.where(batch_key: batch_key, ip_address_id: ip_address_id, locked_by: locker, locked_at: time).where.not(id: id)

# Use a transaction with a shorter lock wait timeout to prevent deadlocks
begin
ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1")
self.class.ready.where(batch_key: batch_key, ip_address_id: ip_address_id, locked_by: nil, locked_at: nil).limit(limit).update_all(locked_by: locker, locked_at: time)
QueuedMessage.where(batch_key: batch_key, ip_address_id: ip_address_id, locked_by: locker, locked_at: time).where.not(id: id)
rescue ActiveRecord::StatementInvalid => e
# If we get a lock timeout, return empty array rather than failing
raise unless e.message.include?("Lock wait timeout") || e.message.include?("Deadlock")

[]
ensure
# Reset to default timeout
ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = DEFAULT")
end
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

class AddIndexesForQueuedMessageDeadlockPrevention < ActiveRecord::Migration[7.1]

def change
# Add composite index for the locking query
# This index covers the WHERE clause in ProcessQueuedMessagesJob:
# WHERE ip_address_id IN (...) AND locked_by IS NULL AND locked_at IS NULL AND (retry_after IS NULL OR retry_after < ?)
add_index :queued_messages, [:locked_by, :locked_at, :retry_after, :ip_address_id],
name: "index_queued_messages_on_lock_and_retry"

# Add index for batch updates by domain (used in apply_domain_throttle_if_required)
# WHERE server_id = ? AND domain = ? AND (retry_after IS NULL OR retry_after < ?)
add_index :queued_messages, [:server_id, :domain, :retry_after],
name: "index_queued_messages_on_server_domain_retry"

# Add index for batch updates by mx_domain (used in requeue_messages_for_mx)
# WHERE server_id = ? AND mx_domain = ? AND (retry_after IS NULL OR retry_after < ?)
add_index :queued_messages, [:server_id, :mx_domain, :retry_after],
name: "index_queued_messages_on_server_mx_retry"

# Add index for batchable_messages query
# WHERE batch_key = ? AND ip_address_id = ? AND locked_by IS NULL AND locked_at IS NULL
add_index :queued_messages, [:batch_key, :ip_address_id, :locked_by, :locked_at],
name: "index_queued_messages_on_batch_lock"
end

end
6 changes: 5 additions & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2026_01_30_100440) do
ActiveRecord::Schema[7.1].define(version: 2026_02_03_100000) do
create_table "additional_route_endpoints", id: :integer, charset: "utf8mb4", collation: "utf8mb4_general_ci", force: :cascade do |t|
t.integer "route_id"
t.string "endpoint_type"
Expand Down Expand Up @@ -396,9 +396,13 @@
t.boolean "manual", default: false
t.string "batch_key"
t.string "mx_domain"
t.index ["batch_key", "ip_address_id", "locked_by", "locked_at"], name: "index_queued_messages_on_batch_lock"
t.index ["domain"], name: "index_queued_messages_on_domain", length: 8
t.index ["locked_by", "locked_at", "retry_after", "ip_address_id"], name: "index_queued_messages_on_lock_and_retry"
t.index ["message_id"], name: "index_queued_messages_on_message_id"
t.index ["mx_domain"], name: "index_queued_messages_on_mx_domain"
t.index ["server_id", "domain", "retry_after"], name: "index_queued_messages_on_server_domain_retry"
t.index ["server_id", "mx_domain", "retry_after"], name: "index_queued_messages_on_server_mx_retry"
t.index ["server_id"], name: "index_queued_messages_on_server_id"
end

Expand Down
Loading