Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ group :development, :test do
gem 'guard-rspec'
gem 'faker'
gem 'timecop'
gem "debugger", platform: :mri
# gem "debugger", platform: :mri

if RUBY_PLATFORM =~ /darwin/
# OS X integration
Expand Down
9 changes: 0 additions & 9 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ GEM
arel (3.0.2)
builder (3.0.4)
coderay (1.0.8)
columnize (0.3.6)
debugger (1.2.1)
columnize (>= 0.3.1)
debugger-linecache (~> 1.1.1)
debugger-ruby_core_source (~> 1.1.4)
debugger-linecache (1.1.2)
debugger-ruby_core_source (>= 1.1.1)
debugger-ruby_core_source (1.1.4)
diff-lcs (1.1.3)
erubis (2.7.0)
faker (1.1.2)
Expand Down Expand Up @@ -143,7 +135,6 @@ PLATFORMS

DEPENDENCIES
batch_api!
debugger
faker
guard
guard-rspec
Expand Down
3 changes: 2 additions & 1 deletion lib/batch_api/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ module BatchApi
endpoint: "/batch",
limit: 50,
batch_middleware: InternalMiddleware::DEFAULT_BATCH_MIDDLEWARE,
operation_middleware: InternalMiddleware::DEFAULT_OPERATION_MIDDLEWARE
operation_middleware: InternalMiddleware::DEFAULT_OPERATION_MIDDLEWARE,
close_connection: InternalMiddleware::DEFAULT_CLOSE_CONNECTION
}

# Batch API Configuration
Expand Down
8 changes: 8 additions & 0 deletions lib/batch_api/internal_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ module InternalMiddleware
use InternalMiddleware::DecodeJsonBody
end

# Public: the default way to close a database connection **within a
# thread**. Used in the parallel processor.
DEFAULT_CLOSE_CONNECTION = Proc.new do
if defined?(ActiveRecord)
ActiveRecord::Base.connection.close
end
end

# Public: the middleware stack to use for processing the batch request as a
# whole..
def self.batch_stack(processor)
Expand Down
7 changes: 5 additions & 2 deletions lib/batch_api/processor.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'batch_api/processor/sequential'
require 'batch_api/processor/semiparallel'
require 'batch_api/operation'

module BatchApi
Expand Down Expand Up @@ -28,7 +29,9 @@ def initialize(request, app)
# provided in BatchApi setup and the request.
# Currently only Sequential is supported.
def strategy
BatchApi::Processor::Sequential
@request.params["sequential"] ?
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have an explicit option -- params["strategy"] or whatever: "sequential", "semiparallel", "parallel", etc.

BatchApi::Processor::Sequential :
BatchApi::Processor::Semiparallel
end

# Public: run the batch operations according to the appropriate strategy.
Expand Down Expand Up @@ -104,7 +107,7 @@ def self.operation_klass
#
# Returns the valid options hash.
def process_options
unless @request.params["sequential"]
unless @request.params.has_key?("sequential")
raise Errors::BadOptionError, "Sequential flag is currently required"
end
@request.params
Expand Down
70 changes: 70 additions & 0 deletions lib/batch_api/processor/semiparallel.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
module BatchApi
class Processor
class Semiparallel
# Public: initialize with the app.
def initialize(app)
@app = app
end

# Public: execute all operations sequentially.
#
# ops - a set of BatchApi::Operations
# options - a set of options
#
# Returns an array of BatchApi::Response objects.
def call(env)
ops = env[:ops].chunk do |op|
op.method == "get"
end.map do |is_get, operations|
if is_get && operations.length > 1
ParallelOps.new(operations)
else
operations
end
end.flatten

ops.collect do |op|
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments!

if op.is_a?(ParallelOps)
op.process(env)
else
# set the current op
env[:op] = op

# execute the individual request inside the operation-specific
# middeware, then clear out the current op afterward
middleware = InternalMiddleware.operation_stack
middleware.call(env).tap {|r| env.delete(:op) }
end
end.flatten
end

private

# Internal: Represents a collection of operations that can be safely
# executed in parallel.
#
# ops - the array of operations that may be executd in parallel.
class ParallelOps < Struct.new(:ops)
# Public: Processes the collection of parallelizable operations in a
# thread.
#
# NOTE: If we saw that the overhead is too big, we may use a thread pool
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see

# instead.
#
# Returns an ordered collection of results.
def process(env)
ops.map do |op|
Thread.new do
dupped = env.deep_dup
dupped[:op] = op
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull these few lines out into a common method?


middleware = InternalMiddleware.operation_stack
middleware.call(dupped).tap(&BatchApi.config.close_connection)
end
end.map(&:join).map(&:value)
end
end
end
end
end

8 changes: 8 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,17 @@ config.middleware.use BatchApi::RackMiddleware do |batch_config|
batch_config.batch_middleware = Proc.new { }
# default middleware stack run for each individual operation
batch_config.operation_middleware = Proc.new { }

# block to close the current DB connection (used in non-sequential processing)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain this a bit more -- why we need it

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

describe the options for processing type -- sequential or semiparallel and what it means,

batch_config.close_connection = Proc.new { DM.connection.close }
end
```

**Important**: by default, if BatchApi detects ActiveRecord loaded, the
close_connection proc will default to `ActiveRecord::Base.connection.close`. If
you don't use ActiveRecord you should override that proc so you never leave open
connections when using non-sequential mode.

That's it! Just fire up your curl, hit your endpoint with the right verb and a properly formatted request, and enjoy some batch API action.

## Why a Batch API?
Expand Down
67 changes: 67 additions & 0 deletions spec/lib/processor/semiparallel_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
require 'spec_helper'

class FakeOperationStack
def call(env)
"Called #{env[:op].method} #{env[:op].number}"
end
end

describe BatchApi::Processor::Semiparallel do

let(:app) { stub("app", call: stub) }
let(:semiparallel) { BatchApi::Processor::Semiparallel.new(app) }

describe "#call" do
let(:env) { {
ops: 10.times.collect {|i|
meth = (3..6).to_a.include?(i) ? 'get' : 'post'
stub("op #{i} #{meth}", method: meth, number: i)
}
} }
let(:stack) { FakeOperationStack.new }

before :each do
BatchApi::InternalMiddleware.stub(:operation_stack).and_return(stack)
end

it "creates an operation middleware stack and calls it for each op" do
env[:ops][0..2].each {|op|
stack.should_receive(:call).
with(hash_including(op: op)).ordered
}

env[:ops][3..6].each {|op|
stack.should_receive(:call).
with(hash_including(op: op))
}

env[:ops][7..9].each {|op|
stack.should_receive(:call).
with(hash_including(op: op)).ordered
}

semiparallel.call(env)
end

it "includes the rest of the env in the calls" do
stack.should_receive(:call).
with(hash_including(env)).exactly(10).times
semiparallel.call(env)
end

it "returns the results of the calls ordered" do
semiparallel.call(env).should == [
"Called post 0",
"Called post 1",
"Called post 2",
"Called get 3",
"Called get 4",
"Called get 5",
"Called get 6",
"Called post 7",
"Called post 8",
"Called post 9"
]
end
end
end
16 changes: 13 additions & 3 deletions spec/lib/processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
end

context "error conditions" do
it "(currently) throws an error if sequential is not true" do
it "(currently) throws an error if sequential is not specified" do
request.params.delete("sequential")
expect {
BatchApi::Processor.new(request, app)
Expand Down Expand Up @@ -88,8 +88,18 @@
end

describe "#strategy" do
it "returns BatchApi::Processor::Sequential" do
processor.strategy.should == BatchApi::Processor::Sequential
context 'if the sequential param is true' do
it "returns BatchApi::Processor::Sequential" do
request.params['sequential'] = true
processor.strategy.should == BatchApi::Processor::Sequential
end
end

context 'if the sequential param is false' do
it "returns BatchApi::Processor::Semiparallel" do
request.params['sequential'] = false
processor.strategy.should == BatchApi::Processor::Semiparallel
end
end
end

Expand Down