From c71e86b1c73050710cbe7ec93bcbdf716d7228bf Mon Sep 17 00:00:00 2001 From: "Josep M. Bach" Date: Tue, 4 Dec 2012 11:22:34 +0100 Subject: [PATCH 1/2] Add naive semiparallel processor --- lib/batch_api/processor.rb | 7 ++- lib/batch_api/processor/semiparallel.rb | 70 +++++++++++++++++++++++++ spec/lib/processor/semiparallel_spec.rb | 67 +++++++++++++++++++++++ spec/lib/processor_spec.rb | 16 ++++-- 4 files changed, 155 insertions(+), 5 deletions(-) create mode 100644 lib/batch_api/processor/semiparallel.rb create mode 100644 spec/lib/processor/semiparallel_spec.rb diff --git a/lib/batch_api/processor.rb b/lib/batch_api/processor.rb index 40738fd..33d932d 100644 --- a/lib/batch_api/processor.rb +++ b/lib/batch_api/processor.rb @@ -1,4 +1,5 @@ require 'batch_api/processor/sequential' +require 'batch_api/processor/semiparallel' require 'batch_api/operation' module BatchApi @@ -28,7 +29,9 @@ def initialize(request, app) # provided in BatchApi setup and the request. # Currently only Sequential is supported. def strategy - BatchApi::Processor::Sequential + @request.params["sequential"] ? + BatchApi::Processor::Sequential : + BatchApi::Processor::Semiparallel end # Public: run the batch operations according to the appropriate strategy. @@ -104,7 +107,7 @@ def self.operation_klass # # Returns the valid options hash. def process_options - unless @request.params["sequential"] + unless @request.params.has_key?("sequential") raise Errors::BadOptionError, "Sequential flag is currently required" end @request.params diff --git a/lib/batch_api/processor/semiparallel.rb b/lib/batch_api/processor/semiparallel.rb new file mode 100644 index 0000000..a4df072 --- /dev/null +++ b/lib/batch_api/processor/semiparallel.rb @@ -0,0 +1,70 @@ +module BatchApi + class Processor + class Semiparallel + # Public: initialize with the app. + def initialize(app) + @app = app + end + + # Public: execute all operations sequentially. + # + # ops - a set of BatchApi::Operations + # options - a set of options + # + # Returns an array of BatchApi::Response objects. + def call(env) + ops = env[:ops].chunk do |op| + op.method == "get" + end.map do |is_get, operations| + if is_get && operations.length > 1 + ParallelOps.new(operations) + else + operations + end + end.flatten + + ops.collect do |op| + if op.is_a?(ParallelOps) + op.process(env) + else + # set the current op + env[:op] = op + + # execute the individual request inside the operation-specific + # middeware, then clear out the current op afterward + middleware = InternalMiddleware.operation_stack + middleware.call(env).tap {|r| env.delete(:op) } + end + end.flatten + end + + private + + # Internal: Represents a collection of operations that can be safely + # executed in parallel. + # + # ops - the array of operations that may be executd in parallel. + class ParallelOps < Struct.new(:ops) + # Public: Processes the collection of parallelizable operations in a + # thread. + # + # NOTE: If we saw that the overhead is too big, we may use a thread pool + # instead. + # + # Returns an ordered collection of results. + def process(env) + ops.map do |op| + Thread.new do + dupped = env.deep_dup + dupped[:op] = op + + middleware = InternalMiddleware.operation_stack + middleware.call(dupped) + end + end.map(&:join).map(&:value) + end + end + end + end +end + diff --git a/spec/lib/processor/semiparallel_spec.rb b/spec/lib/processor/semiparallel_spec.rb new file mode 100644 index 0000000..c786264 --- /dev/null +++ b/spec/lib/processor/semiparallel_spec.rb @@ -0,0 +1,67 @@ +require 'spec_helper' + +class FakeOperationStack + def call(env) + "Called #{env[:op].method} #{env[:op].number}" + end +end + +describe BatchApi::Processor::Semiparallel do + + let(:app) { stub("app", call: stub) } + let(:semiparallel) { BatchApi::Processor::Semiparallel.new(app) } + + describe "#call" do + let(:env) { { + ops: 10.times.collect {|i| + meth = (3..6).to_a.include?(i) ? 'get' : 'post' + stub("op #{i} #{meth}", method: meth, number: i) + } + } } + let(:stack) { FakeOperationStack.new } + + before :each do + BatchApi::InternalMiddleware.stub(:operation_stack).and_return(stack) + end + + it "creates an operation middleware stack and calls it for each op" do + env[:ops][0..2].each {|op| + stack.should_receive(:call). + with(hash_including(op: op)).ordered + } + + env[:ops][3..6].each {|op| + stack.should_receive(:call). + with(hash_including(op: op)) + } + + env[:ops][7..9].each {|op| + stack.should_receive(:call). + with(hash_including(op: op)).ordered + } + + semiparallel.call(env) + end + + it "includes the rest of the env in the calls" do + stack.should_receive(:call). + with(hash_including(env)).exactly(10).times + semiparallel.call(env) + end + + it "returns the results of the calls ordered" do + semiparallel.call(env).should == [ + "Called post 0", + "Called post 1", + "Called post 2", + "Called get 3", + "Called get 4", + "Called get 5", + "Called get 6", + "Called post 7", + "Called post 8", + "Called post 9" + ] + end + end +end diff --git a/spec/lib/processor_spec.rb b/spec/lib/processor_spec.rb index 9830daa..0153abe 100644 --- a/spec/lib/processor_spec.rb +++ b/spec/lib/processor_spec.rb @@ -59,7 +59,7 @@ end context "error conditions" do - it "(currently) throws an error if sequential is not true" do + it "(currently) throws an error if sequential is not specified" do request.params.delete("sequential") expect { BatchApi::Processor.new(request, app) @@ -88,8 +88,18 @@ end describe "#strategy" do - it "returns BatchApi::Processor::Sequential" do - processor.strategy.should == BatchApi::Processor::Sequential + context 'if the sequential param is true' do + it "returns BatchApi::Processor::Sequential" do + request.params['sequential'] = true + processor.strategy.should == BatchApi::Processor::Sequential + end + end + + context 'if the sequential param is false' do + it "returns BatchApi::Processor::Semiparallel" do + request.params['sequential'] = false + processor.strategy.should == BatchApi::Processor::Semiparallel + end end end From 260eee3b8c3252caf33c90816c24ba2699020573 Mon Sep 17 00:00:00 2001 From: "Josep M. Bach" Date: Tue, 4 Dec 2012 11:43:32 +0100 Subject: [PATCH 2/2] Add configurable close_connection and make it work in latest 1.9.3 --- Gemfile | 2 +- Gemfile.lock | 9 --------- lib/batch_api/configuration.rb | 3 ++- lib/batch_api/internal_middleware.rb | 8 ++++++++ lib/batch_api/processor/semiparallel.rb | 2 +- readme.md | 8 ++++++++ 6 files changed, 20 insertions(+), 12 deletions(-) diff --git a/Gemfile b/Gemfile index 86d3189..71005be 100644 --- a/Gemfile +++ b/Gemfile @@ -12,7 +12,7 @@ group :development, :test do gem 'guard-rspec' gem 'faker' gem 'timecop' - gem "debugger", platform: :mri + # gem "debugger", platform: :mri if RUBY_PLATFORM =~ /darwin/ # OS X integration diff --git a/Gemfile.lock b/Gemfile.lock index 2adb663..69b3f49 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -37,14 +37,6 @@ GEM arel (3.0.2) builder (3.0.4) coderay (1.0.8) - columnize (0.3.6) - debugger (1.2.1) - columnize (>= 0.3.1) - debugger-linecache (~> 1.1.1) - debugger-ruby_core_source (~> 1.1.4) - debugger-linecache (1.1.2) - debugger-ruby_core_source (>= 1.1.1) - debugger-ruby_core_source (1.1.4) diff-lcs (1.1.3) erubis (2.7.0) faker (1.1.2) @@ -143,7 +135,6 @@ PLATFORMS DEPENDENCIES batch_api! - debugger faker guard guard-rspec diff --git a/lib/batch_api/configuration.rb b/lib/batch_api/configuration.rb index fb16190..074eecd 100644 --- a/lib/batch_api/configuration.rb +++ b/lib/batch_api/configuration.rb @@ -20,7 +20,8 @@ module BatchApi endpoint: "/batch", limit: 50, batch_middleware: InternalMiddleware::DEFAULT_BATCH_MIDDLEWARE, - operation_middleware: InternalMiddleware::DEFAULT_OPERATION_MIDDLEWARE + operation_middleware: InternalMiddleware::DEFAULT_OPERATION_MIDDLEWARE, + close_connection: InternalMiddleware::DEFAULT_CLOSE_CONNECTION } # Batch API Configuration diff --git a/lib/batch_api/internal_middleware.rb b/lib/batch_api/internal_middleware.rb index d33d2cd..90a60ee 100644 --- a/lib/batch_api/internal_middleware.rb +++ b/lib/batch_api/internal_middleware.rb @@ -62,6 +62,14 @@ module InternalMiddleware use InternalMiddleware::DecodeJsonBody end + # Public: the default way to close a database connection **within a + # thread**. Used in the parallel processor. + DEFAULT_CLOSE_CONNECTION = Proc.new do + if defined?(ActiveRecord) + ActiveRecord::Base.connection.close + end + end + # Public: the middleware stack to use for processing the batch request as a # whole.. def self.batch_stack(processor) diff --git a/lib/batch_api/processor/semiparallel.rb b/lib/batch_api/processor/semiparallel.rb index a4df072..c3b05c9 100644 --- a/lib/batch_api/processor/semiparallel.rb +++ b/lib/batch_api/processor/semiparallel.rb @@ -59,7 +59,7 @@ def process(env) dupped[:op] = op middleware = InternalMiddleware.operation_stack - middleware.call(dupped) + middleware.call(dupped).tap(&BatchApi.config.close_connection) end end.map(&:join).map(&:value) end diff --git a/readme.md b/readme.md index fb4edf8..0610c9f 100644 --- a/readme.md +++ b/readme.md @@ -107,9 +107,17 @@ config.middleware.use BatchApi::RackMiddleware do |batch_config| batch_config.batch_middleware = Proc.new { } # default middleware stack run for each individual operation batch_config.operation_middleware = Proc.new { } + + # block to close the current DB connection (used in non-sequential processing) + batch_config.close_connection = Proc.new { DM.connection.close } end ``` +**Important**: by default, if BatchApi detects ActiveRecord loaded, the +close_connection proc will default to `ActiveRecord::Base.connection.close`. If +you don't use ActiveRecord you should override that proc so you never leave open +connections when using non-sequential mode. + That's it! Just fire up your curl, hit your endpoint with the right verb and a properly formatted request, and enjoy some batch API action. ## Why a Batch API?