Skip to content

Commit 0ce4bb1

Browse files
author
Vidas P
committed
Process one message at a time
Make agents process only one message at a time and remove `bulk_receive` functionality. Processing multiple messages in a go was mostly a performance improvement. Semantically it is much cleaner when `receive` processes a single message so that it matches transaction boundaries. This also opens the doors to solving existing concurrency issues. Breaking change: CSV Agent did use the functionality, where it would process several messages (rows) in one go and emit a single CSV message. This behavior wasn't deterministic and from now on it is limited to producing one CSV message from one `data` message containing multiple rows. (Custom and remote agents already conform to a single message per receive API).
1 parent 3e4318e commit 0ce4bb1

File tree

69 files changed

+525
-586
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+525
-586
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
## [current]
9+
### Changed
10+
- Semantics of message delivery got different: agents now process a single
11+
message at a time, thus matching transactional boundaries.
12+
- BREAKING: CSV Agent in 'serialize' mode handles only a single message at a
13+
time and can't aggregate multiple 'row' messages into a single CSV. Old
14+
behaviour wasn't exactly deterministic, suggested way would be to use
15+
`Digest Agent` to aggregate messages and then serialize them with
16+
`CSV Agent`.
17+
918
### Fixed
1019
- Update nokogiri (CVE-2020-7595)
1120

21+
1222
## [0.9.6] - 2019-12-09
1323
### Added
1424
- Support for remote agents. Custom agents can now be written in any

app/concerns/dry_runnable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def dry_run!(message = nil)
2626
@dry_run_logger.info('Dry Run started')
2727
if message
2828
raise 'This agent cannot receive an message!' unless can_receive_messages?
29-
receive([message])
29+
receive(message)
3030
else
3131
check
3232
end

app/concerns/liquid_interpolatable.rb

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,12 @@ def interpolation_context
4343
# Take the given object as "self" in the current interpolation
4444
# context while running a given block.
4545
#
46-
# The most typical use case for this is to evaluate options for each
47-
# received message like this:
46+
# The most typical use case for this is to evaluate
47+
# options for a received message like this:
4848
#
49-
# def receive(incoming_messages)
50-
# incoming_messages.each do |message|
51-
# interpolate_with(message) do
49+
# def receive(message)
50+
# interpolate_with(message) do
5251
# # Handle each message based on "interpolated" options.
53-
# end
5452
# end
5553
# end
5654
def interpolate_with(self_object)

app/concerns/sortable_messages.rb

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,11 @@ def check
9595
end
9696
end
9797

98-
def receive(incoming_messages)
98+
def receive(message)
9999
return super unless messages_order || include_sort_info?
100-
# incoming messages should be processed sequentially
101-
incoming_messages.each do |message|
102-
sorting_messages do
103-
super([message])
104-
end
100+
101+
sorting_messages do
102+
super(message)
105103
end
106104
end
107105

app/jobs/agent_receive_job.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ class AgentReceiveJob < ActiveJob::Base
22
# Given an Agent id and an array of Message ids, load the Agent, call #receive on it with the Message objects, and then
33
# save it with an updated `last_receive_at` timestamp.
44
# rubocop:disable Style/RescueStandardError
5-
def perform(agent_id, message_ids)
5+
def perform(agent_id, message_id)
66
agent = Agent.find(agent_id)
77
begin
88
return if agent.unavailable?
9-
agent.receive(Message.where(id: message_ids).order(:id))
9+
agent.receive(Message.find(message_id))
1010
agent.last_receive_at = Time.now
1111
agent.save!
1212
rescue => e

app/models/agent.rb

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def default_options
9595
{}
9696
end
9797

98-
def receive(messages)
98+
def receive(message)
9999
# Implement me in your subclass of Agent.
100100
end
101101

@@ -238,10 +238,6 @@ def can_dry_run?
238238
self.class.can_dry_run?
239239
end
240240

241-
def no_bulk_receive?
242-
self.class.no_bulk_receive?
243-
end
244-
245241
def log(message, options = {})
246242
AgentLog.log_for_agent(self, message, options)
247243
end
@@ -371,14 +367,6 @@ def can_dry_run?
371367
!!@can_dry_run
372368
end
373369

374-
def no_bulk_receive!
375-
@no_bulk_receive = true
376-
end
377-
378-
def no_bulk_receive?
379-
!!@no_bulk_receive
380-
end
381-
382370
def gem_dependency_check
383371
@gem_dependencies_checked = true
384372
@gem_dependencies_met = yield
@@ -424,11 +412,7 @@ def receive!(options = {})
424412
agent.update_attribute :last_checked_message_id, message_ids.max
425413
# rubocop:enable Rails/SkipsModelValidations
426414

427-
if agent.no_bulk_receive?
428-
message_ids.each { |message_id| Agent.async_receive(agent.id, [message_id]) }
429-
else
430-
Agent.async_receive(agent.id, message_ids)
431-
end
415+
message_ids.each { |message_id| Agent.async_receive(agent.id, message_id) }
432416
end
433417

434418
{
@@ -440,8 +424,8 @@ def receive!(options = {})
440424

441425
# This method will enqueue an AgentReceiveJob job. It accepts Agent and Message ids instead of a literal ActiveRecord
442426
# models because it is preferable to serialize jobs with ids.
443-
def async_receive(agent_id, message_ids)
444-
AgentReceiveJob.perform_later(agent_id, message_ids)
427+
def async_receive(agent_id, message_id)
428+
AgentReceiveJob.perform_later(agent_id, message_id)
445429
end
446430

447431
# Given a schedule name, run `check` via `bulk_check` on all Agents with that schedule.

app/models/agents/attribute_difference_agent.rb

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,8 @@ def validate_options
4949
end
5050
end
5151

52-
def receive(incoming_messages)
53-
incoming_messages.each do |message|
54-
handle(interpolated(message), message)
55-
end
52+
def receive(message)
53+
handle(interpolated(message), message)
5654
end
5755

5856
private

app/models/agents/buffer_agent.rb

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,14 @@ def validate_options
5555
end
5656
# rubocop:enable Metrics/CyclomaticComplexity
5757

58-
def receive(incoming_messages)
59-
incoming_messages.each do |message|
60-
memory['message_ids'] ||= []
61-
memory['message_ids'] << message.id
62-
if memory['message_ids'].length > interpolated['max_messages'].to_i
63-
if options['keep'] == 'newest'
64-
memory['message_ids'].shift
65-
else
66-
memory['message_ids'].pop
67-
end
58+
def receive(message)
59+
memory['message_ids'] ||= []
60+
memory['message_ids'] << message.id
61+
if memory['message_ids'].length > interpolated['max_messages'].to_i
62+
if options['keep'] == 'newest'
63+
memory['message_ids'].shift
64+
else
65+
memory['message_ids'].pop
6866
end
6967
end
7068
end

app/models/agents/change_detector_agent.rb

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,10 @@ def validate_options
3737
end
3838
end
3939

40-
def receive(incoming_messages)
41-
incoming_messages.each do |message|
42-
interpolation_context.stack do
43-
interpolation_context['last_property'] = last_property
44-
handle(interpolated(message), message)
45-
end
40+
def receive(message)
41+
interpolation_context.stack do
42+
interpolation_context['last_property'] = last_property
43+
handle(interpolated(message), message)
4644
end
4745
end
4846

app/models/agents/commander_agent.rb

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,9 @@ def check
3232
control!
3333
end
3434

35-
def receive(incoming_messages)
36-
incoming_messages.each do |message|
37-
interpolate_with(message) do
38-
control!
39-
end
35+
def receive(message)
36+
interpolate_with(message) do
37+
control!
4038
end
4139
end
4240
end

0 commit comments

Comments
 (0)