-
Notifications
You must be signed in to change notification settings - Fork 47
Add naive semiparallel processor #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| module BatchApi | ||
| class Processor | ||
| class Semiparallel | ||
| # Public: initialize with the app. | ||
| def initialize(app) | ||
| @app = app | ||
| end | ||
|
|
||
| # Public: execute all operations sequentially. | ||
| # | ||
| # ops - a set of BatchApi::Operations | ||
| # options - a set of options | ||
| # | ||
| # Returns an array of BatchApi::Response objects. | ||
| def call(env) | ||
| ops = env[:ops].chunk do |op| | ||
| op.method == "get" | ||
| end.map do |is_get, operations| | ||
| if is_get && operations.length > 1 | ||
| ParallelOps.new(operations) | ||
| else | ||
| operations | ||
| end | ||
| end.flatten | ||
|
|
||
| ops.collect do |op| | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add comments! |
||
| if op.is_a?(ParallelOps) | ||
| op.process(env) | ||
| else | ||
| # set the current op | ||
| env[:op] = op | ||
|
|
||
| # execute the individual request inside the operation-specific | ||
| # middeware, then clear out the current op afterward | ||
| middleware = InternalMiddleware.operation_stack | ||
| middleware.call(env).tap {|r| env.delete(:op) } | ||
| end | ||
| end.flatten | ||
| end | ||
|
|
||
| private | ||
|
|
||
| # Internal: Represents a collection of operations that can be safely | ||
| # executed in parallel. | ||
| # | ||
| # ops - the array of operations that may be executd in parallel. | ||
| class ParallelOps < Struct.new(:ops) | ||
| # Public: Processes the collection of parallelizable operations in a | ||
| # thread. | ||
| # | ||
| # NOTE: If we saw that the overhead is too big, we may use a thread pool | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see |
||
| # instead. | ||
| # | ||
| # Returns an ordered collection of results. | ||
| def process(env) | ||
| ops.map do |op| | ||
| Thread.new do | ||
| dupped = env.deep_dup | ||
| dupped[:op] = op | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pull these few lines out into a common method? |
||
|
|
||
| middleware = InternalMiddleware.operation_stack | ||
| middleware.call(dupped).tap(&BatchApi.config.close_connection) | ||
| end | ||
| end.map(&:join).map(&:value) | ||
| end | ||
| end | ||
| end | ||
| end | ||
| end | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -107,9 +107,17 @@ config.middleware.use BatchApi::RackMiddleware do |batch_config| | |
| batch_config.batch_middleware = Proc.new { } | ||
| # default middleware stack run for each individual operation | ||
| batch_config.operation_middleware = Proc.new { } | ||
|
|
||
| # block to close the current DB connection (used in non-sequential processing) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. explain this a bit more -- why we need it
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. describe the options for processing type -- sequential or semiparallel and what it means, |
||
| batch_config.close_connection = Proc.new { DM.connection.close } | ||
| end | ||
| ``` | ||
|
|
||
| **Important**: by default, if BatchApi detects ActiveRecord loaded, the | ||
| close_connection proc will default to `ActiveRecord::Base.connection.close`. If | ||
| you don't use ActiveRecord you should override that proc so you never leave open | ||
| connections when using non-sequential mode. | ||
|
|
||
| That's it! Just fire up your curl, hit your endpoint with the right verb and a properly formatted request, and enjoy some batch API action. | ||
|
|
||
| ## Why a Batch API? | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| require 'spec_helper' | ||
|
|
||
| class FakeOperationStack | ||
| def call(env) | ||
| "Called #{env[:op].method} #{env[:op].number}" | ||
| end | ||
| end | ||
|
|
||
| describe BatchApi::Processor::Semiparallel do | ||
|
|
||
| let(:app) { stub("app", call: stub) } | ||
| let(:semiparallel) { BatchApi::Processor::Semiparallel.new(app) } | ||
|
|
||
| describe "#call" do | ||
| let(:env) { { | ||
| ops: 10.times.collect {|i| | ||
| meth = (3..6).to_a.include?(i) ? 'get' : 'post' | ||
| stub("op #{i} #{meth}", method: meth, number: i) | ||
| } | ||
| } } | ||
| let(:stack) { FakeOperationStack.new } | ||
|
|
||
| before :each do | ||
| BatchApi::InternalMiddleware.stub(:operation_stack).and_return(stack) | ||
| end | ||
|
|
||
| it "creates an operation middleware stack and calls it for each op" do | ||
| env[:ops][0..2].each {|op| | ||
| stack.should_receive(:call). | ||
| with(hash_including(op: op)).ordered | ||
| } | ||
|
|
||
| env[:ops][3..6].each {|op| | ||
| stack.should_receive(:call). | ||
| with(hash_including(op: op)) | ||
| } | ||
|
|
||
| env[:ops][7..9].each {|op| | ||
| stack.should_receive(:call). | ||
| with(hash_including(op: op)).ordered | ||
| } | ||
|
|
||
| semiparallel.call(env) | ||
| end | ||
|
|
||
| it "includes the rest of the env in the calls" do | ||
| stack.should_receive(:call). | ||
| with(hash_including(env)).exactly(10).times | ||
| semiparallel.call(env) | ||
| end | ||
|
|
||
| it "returns the results of the calls ordered" do | ||
| semiparallel.call(env).should == [ | ||
| "Called post 0", | ||
| "Called post 1", | ||
| "Called post 2", | ||
| "Called get 3", | ||
| "Called get 4", | ||
| "Called get 5", | ||
| "Called get 6", | ||
| "Called post 7", | ||
| "Called post 8", | ||
| "Called post 9" | ||
| ] | ||
| end | ||
| end | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have an explicit option -- params["strategy"] or whatever: "sequential", "semiparallel", "parallel", etc.