diff --git a/.ruby-version b/.ruby-version index 8e17352..d538d61 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -ruby-1.9.3-p448 +ruby-2.2.0 \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index efd10ac..457cdc9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,3 @@ language: ruby rvm: - - 1.9.3 - - rbx-19mode - - jruby-19mode -matrix: - allow_failures: - - rvm: jruby-19mode + - 2.2.0 diff --git a/Gemfile b/Gemfile index 86d3189..03d6807 100644 --- a/Gemfile +++ b/Gemfile @@ -12,9 +12,10 @@ group :development, :test do gem 'guard-rspec' gem 'faker' gem 'timecop' - gem "debugger", platform: :mri + gem 'pry' + gem 'test-unit' - if RUBY_PLATFORM =~ /darwin/ + group :darwin do # OS X integration gem "ruby_gntp" gem "rb-fsevent" diff --git a/Gemfile.lock b/Gemfile.lock index a1e58ed..2a1cefd 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -2,6 +2,7 @@ PATH remote: . specs: batch_api (0.2.1) + celluloid middleware GEM @@ -36,14 +37,9 @@ GEM multi_json (~> 1.0) arel (3.0.2) builder (3.0.4) + celluloid (0.16.0) + timers (~> 4.0.0) coderay (1.0.8) - columnize (0.3.6) - debugger (1.6.1) - columnize (>= 0.3.1) - debugger-linecache (~> 1.2.0) - debugger-ruby_core_source (~> 1.2.3) - debugger-linecache (1.2.0) - debugger-ruby_core_source (1.2.3) diff-lcs (1.1.3) erubis (2.7.0) faker (1.1.2) @@ -57,9 +53,10 @@ GEM guard (>= 1.1) rspec (~> 2.11) hike (1.2.1) + hitimes (1.2.2) i18n (0.6.1) journey (1.0.4) - json (1.7.5) + json (1.8.2) listen (0.5.3) lumberjack (1.0.2) mail (2.4.4) @@ -71,6 +68,7 @@ GEM mime-types (1.19) multi_json (1.3.6) polyglot (0.3.3) + power_assert (0.2.2) pry (0.9.10) coderay (~> 1.0.5) method_source (~> 0.8) @@ -128,10 +126,14 @@ GEM hike (~> 1.2) rack (~> 1.0) tilt (~> 1.1, != 1.3.0) - sqlite3 (1.3.6) + sqlite3 (1.3.10) + test-unit (3.0.9) + power_assert thor (0.16.0) tilt (1.3.3) timecop (0.5.3) + timers (4.0.1) + hitimes treetop (1.4.11) polyglot polyglot (>= 0.3.1) @@ -142,10 +144,10 @@ PLATFORMS DEPENDENCIES batch_api! - debugger faker guard guard-rspec + pry rack-contrib rails (~> 3.2) rb-fsevent @@ -154,4 +156,5 @@ DEPENDENCIES ruby_gntp sinatra sqlite3 + test-unit timecop diff --git a/batch_api.gemspec b/batch_api.gemspec index f374eb7..f97a32e 100644 --- a/batch_api.gemspec +++ b/batch_api.gemspec @@ -18,7 +18,8 @@ Gem::Specification.new do |s| s.test_files = Dir["spec/**/*"] s.add_runtime_dependency("middleware") - + s.add_runtime_dependency("celluloid") + s.add_development_dependency("rails", "~> 3.2") s.add_development_dependency("sinatra") s.add_development_dependency("rspec") diff --git a/lib/batch_api/configuration.rb b/lib/batch_api/configuration.rb index fee2a1f..d15a0d4 100644 --- a/lib/batch_api/configuration.rb +++ b/lib/batch_api/configuration.rb @@ -19,6 +19,7 @@ module BatchApi verb: :post, endpoint: "/batch", limit: 50, + parallel_size: 10, batch_middleware: InternalMiddleware::DEFAULT_BATCH_MIDDLEWARE, operation_middleware: InternalMiddleware::DEFAULT_OPERATION_MIDDLEWARE } diff --git a/lib/batch_api/internal_middleware.rb b/lib/batch_api/internal_middleware.rb index d33d2cd..075f26f 100644 --- a/lib/batch_api/internal_middleware.rb +++ b/lib/batch_api/internal_middleware.rb @@ -1,5 +1,6 @@ require 'middleware' require 'batch_api/processor/sequential' +require 'batch_api/processor/parallel' require 'batch_api/processor/executor' require 'batch_api/internal_middleware/decode_json_body' require 'batch_api/internal_middleware/response_filter' @@ -68,7 +69,6 @@ def self.batch_stack(processor) Middleware::Builder.new do # evaluate these in the context of the middleware stack self.instance_eval &BatchApi.config.batch_middleware - # for now, everything's sequential, but that will change use processor.strategy end end diff --git a/lib/batch_api/internal_middleware/decode_json_body.rb b/lib/batch_api/internal_middleware/decode_json_body.rb index 0d49aa9..8b59d7d 100644 --- a/lib/batch_api/internal_middleware/decode_json_body.rb +++ b/lib/batch_api/internal_middleware/decode_json_body.rb @@ -10,7 +10,7 @@ def initialize(app) def call(env) @app.call(env).tap do |result| - result.body = MultiJson.load(result.body) if should_decode?(result) + result.body = MultiJson.load(result.body) if (should_decode?(result) && !result.body.blank?) end end diff --git a/lib/batch_api/parallel_actor.rb b/lib/batch_api/parallel_actor.rb new file mode 100644 index 0000000..b361b87 --- /dev/null +++ b/lib/batch_api/parallel_actor.rb @@ -0,0 +1,21 @@ +require 'celluloid' + +module BatchApi + class ParallelActor + include ::Celluloid + + def run(env) + middleware = InternalMiddleware.operation_stack + + if defined?(ActiveRecord) + ActiveRecord::Base.connection_pool.with_connection do + middleware.call(env).tap {|r| env.delete(:op) } + end + else + middleware.call(env).tap {|r| env.delete(:op) } + end + end + end +end + + \ No newline at end of file diff --git a/lib/batch_api/processor.rb b/lib/batch_api/processor.rb index 40738fd..f37b112 100644 --- a/lib/batch_api/processor.rb +++ b/lib/batch_api/processor.rb @@ -28,7 +28,7 @@ def initialize(request, app) # provided in BatchApi setup and the request. # Currently only Sequential is supported. def strategy - BatchApi::Processor::Sequential + @request.params["sequential"] ? BatchApi::Processor::Sequential : BatchApi::Processor::Parallel end # Public: run the batch operations according to the appropriate strategy. @@ -95,18 +95,11 @@ def self.operation_klass end # Internal: Processes any other provided options for validity. - # Currently, the :sequential option is REQUIRED (until parallel - # implementation is created). # # options - an options hash # - # Raises Errors::BadOptionError if sequential is not provided. - # # Returns the valid options hash. def process_options - unless @request.params["sequential"] - raise Errors::BadOptionError, "Sequential flag is currently required" - end @request.params end end diff --git a/lib/batch_api/processor/parallel.rb b/lib/batch_api/processor/parallel.rb new file mode 100644 index 0000000..e12000f --- /dev/null +++ b/lib/batch_api/processor/parallel.rb @@ -0,0 +1,34 @@ +require 'batch_api/parallel_actor' +module BatchApi + class Processor + class Parallel + + # Public: initialize with the app. + def initialize(app) + @app = app + end + + # Public: execute all operations sequentially. + # + # ops - a set of BatchApi::Operations + # options - a set of options + # + # Returns an array of BatchApi::Response objects. + def call(env) + futures = env[:ops].map do |op| + _env = BatchApi::Utils.deep_dup(env) + _env[:op] = op + self.class.get_actor_pool.future.run(_env) + end + futures.map do |future| + future.value + end + end + + def self.get_actor_pool + Celluloid::Actor[:batch_parallel_pool] ||= BatchApi::ParallelActor.pool(size: BatchApi.config.parallel_size) + end + end + end +end + diff --git a/spec/integration/rails_spec.rb b/spec/integration/rails_spec.rb index 29dda87..db90fb7 100644 --- a/spec/integration/rails_spec.rb +++ b/spec/integration/rails_spec.rb @@ -6,5 +6,6 @@ BatchApi.stub(:rails?).and_return(true) end - it_should_behave_like "integrating with a server" + it_should_behave_like "integrating with a server", true + it_should_behave_like "integrating with a server", false end diff --git a/spec/integration/shared_examples.rb b/spec/integration/shared_examples.rb index 7ba6b20..b8c4c7a 100644 --- a/spec/integration/shared_examples.rb +++ b/spec/integration/shared_examples.rb @@ -19,7 +19,7 @@ end end -shared_examples_for "integrating with a server" do +shared_examples_for "integrating with a server" do |sequential| def headerize(hash) Hash[hash.map do |k, v| ["HTTP_#{k.to_s.upcase}", v.to_s] @@ -153,7 +153,7 @@ def headerize(hash) failed_silent_request, get_by_default_request ], - sequential: true + sequential: sequential }.to_json, "CONTENT_TYPE" => "application/json" end diff --git a/spec/integration/sinatra_integration_spec.rb b/spec/integration/sinatra_integration_spec.rb index 3b30254..d391777 100644 --- a/spec/integration/sinatra_integration_spec.rb +++ b/spec/integration/sinatra_integration_spec.rb @@ -10,5 +10,6 @@ def app SinatraApp end - it_should_behave_like "integrating with a server" + it_should_behave_like "integrating with a server", true + it_should_behave_like "integrating with a server", false end diff --git a/spec/lib/internal_middleware/decode_json_body_spec.rb b/spec/lib/internal_middleware/decode_json_body_spec.rb index c724338..1374850 100644 --- a/spec/lib/internal_middleware/decode_json_body_spec.rb +++ b/spec/lib/internal_middleware/decode_json_body_spec.rb @@ -33,5 +33,12 @@ decoder.call(env).body.should == MultiJson.dump(json) end end + + context "for blank responses" do + it "doesn't try to parse" do + result.body = '' + decoder.call(env).body.should == '' + end + end end end diff --git a/spec/lib/processor/parallel_spec.rb b/spec/lib/processor/parallel_spec.rb new file mode 100644 index 0000000..d66f2f3 --- /dev/null +++ b/spec/lib/processor/parallel_spec.rb @@ -0,0 +1,39 @@ +require 'spec_helper' + +describe BatchApi::Processor::Parallel do + + let(:app) { stub("app", call: stub) } + let(:parallel) { BatchApi::Processor::Parallel.new(app) } + + describe "#call" do + let(:call_results) { 3.times.collect {|i| stub("called #{i}") } } + let(:env) { { + ops: 3.times.collect {|i| stub("op #{i}") } + } } + let(:op_middleware) { stub("middleware", call: {}) } + + before :each do + BatchApi::InternalMiddleware. + stub(:operation_stack).and_return(op_middleware) + op_middleware.stub(:call).and_return(*call_results) + end + + it "creates an operation middleware stack and calls it for each op" do + env[:ops].each {|op| + op_middleware.should_receive(:call). + with(hash_including(op: op)) + } + parallel.call(env) + end + + it "includes the rest of the env in the calls" do + op_middleware.should_receive(:call). + with(hash_including(env)).exactly(3).times + parallel.call(env) + end + + it "returns the results of the calls" do + parallel.call(env).should =~ call_results + end + end +end \ No newline at end of file diff --git a/spec/lib/processor_spec.rb b/spec/lib/processor_spec.rb index 9830daa..7b506b8 100644 --- a/spec/lib/processor_spec.rb +++ b/spec/lib/processor_spec.rb @@ -59,12 +59,6 @@ end context "error conditions" do - it "(currently) throws an error if sequential is not true" do - request.params.delete("sequential") - expect { - BatchApi::Processor.new(request, app) - }.to raise_exception(BatchApi::Errors::BadOptionError) - end it "raise a OperationLimitExceeded error if too many ops provided" do ops = (BatchApi.config.limit + 1).to_i.times.collect {|i| i} @@ -91,6 +85,13 @@ it "returns BatchApi::Processor::Sequential" do processor.strategy.should == BatchApi::Processor::Sequential end + it "returns BatchApi::Processor::Parallel" do + request = Rack::Request.new(env).tap do |r| + r.stub(:params).and_return({}.merge("ops" => ops).merge("sequential" => false)) + end + processor = BatchApi::Processor.new(request, app) + processor.strategy.should == BatchApi::Processor::Parallel + end end describe "#execute!" do diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 7fbf890..9dc714a 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -20,6 +20,7 @@ RSpec.configure do |config| config.before :each do BatchApi.config.limit = 20 + BatchApi.config.parallel_size = 11 BatchApi.config.endpoint = "/batch" BatchApi.config.verb = :post