Class: Dynflow::Testing::InThreadExecutor
- Inherits:
-
Object
- Object
- Dynflow::Testing::InThreadExecutor
- Defined in:
- lib/dynflow/testing/in_thread_executor.rb
Instance Method Summary collapse
- #clock_tick ⇒ Object
- #delayed_event(director_event) ⇒ Object
- #event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
- #execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true) ⇒ Object
- #feed_queue(work_items) ⇒ Object
- #handle_work(work_item) ⇒ Object
-
#initialize(world) ⇒ InThreadExecutor
constructor
A new instance of InThreadExecutor.
- #plan_events(delayed_events) ⇒ Object
- #process_work_items ⇒ Object
- #terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
Constructor Details
#initialize(world) ⇒ InThreadExecutor
Returns a new instance of InThreadExecutor.
6 7 8 9 10 |
# File 'lib/dynflow/testing/in_thread_executor.rb', line 6 def initialize(world) @world = world @director = Director.new(@world) @work_items = Queue.new end |
Instance Method Details
#clock_tick ⇒ Object
53 54 55 |
# File 'lib/dynflow/testing/in_thread_executor.rb', line 53 def clock_tick @world.clock.progress_all([:periodic_check_inbox]) end |
#delayed_event(director_event) ⇒ Object
46 47 48 49 50 51 |
# File 'lib/dynflow/testing/in_thread_executor.rb', line 46 def delayed_event(director_event) @director.handle_event(director_event).each do |work_item| @work_items << work_item end director_event.result end |
#event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/dynflow/testing/in_thread_executor.rb', line 38 def event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future, optional: false) event = (Director::Event[SecureRandom.uuid, execution_plan_id, step_id, event, future, optional]) @director.handle_event(event).each do |work_item| @work_items << work_item end future end |
#execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true) ⇒ Object
12 13 14 15 16 |
# File 'lib/dynflow/testing/in_thread_executor.rb', line 12 def execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true) feed_queue(@director.start_execution(execution_plan_id, finished)) process_work_items finished end |
#feed_queue(work_items) ⇒ Object
57 58 59 60 61 62 |
# File 'lib/dynflow/testing/in_thread_executor.rb', line 57 def feed_queue(work_items) work_items.each do |work_item| work_item.world = @world @work_items.push(work_item) end end |
#handle_work(work_item) ⇒ Object
31 32 33 34 35 36 |
# File 'lib/dynflow/testing/in_thread_executor.rb', line 31 def handle_work(work_item) work_item.execute step = work_item.step if work_item.is_a?(Director::StepWorkItem) plan_events(step && step.delayed_events) if step && step.delayed_events @director.work_finished(work_item) end |
#plan_events(delayed_events) ⇒ Object
25 26 27 28 29 |
# File 'lib/dynflow/testing/in_thread_executor.rb', line 25 def plan_events(delayed_events) delayed_events.each do |event| @world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time) end end |
#process_work_items ⇒ Object
18 19 20 21 22 23 |
# File 'lib/dynflow/testing/in_thread_executor.rb', line 18 def process_work_items until @work_items.empty? feed_queue(handle_work(@work_items.pop)) clock_tick end end |
#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
64 65 66 67 68 69 |
# File 'lib/dynflow/testing/in_thread_executor.rb', line 64 def terminate(future = Concurrent::Promises.resolvable_future) @director.terminate future.fulfill true rescue => e future.reject e end |