Class: Dynflow::Testing::InThreadExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/dynflow/testing/in_thread_executor.rb

Instance Method Summary collapse

Constructor Details

#initialize(world) ⇒ InThreadExecutor

Returns a new instance of InThreadExecutor.



6
7
8
9
10
# File 'lib/dynflow/testing/in_thread_executor.rb', line 6

def initialize(world)
  @world = world
  @director = Director.new(@world)
  @work_items = Queue.new
end

Instance Method Details

#clock_tickObject



53
54
55
# File 'lib/dynflow/testing/in_thread_executor.rb', line 53

def clock_tick
  @world.clock.progress_all([:periodic_check_inbox])
end

#delayed_event(director_event) ⇒ Object



46
47
48
49
50
51
# File 'lib/dynflow/testing/in_thread_executor.rb', line 46

def delayed_event(director_event)
  @director.handle_event(director_event).each do |work_item|
    @work_items << work_item
  end
  director_event.result
end

#event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/dynflow/testing/in_thread_executor.rb', line 38

def event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future, optional: false)
  event = (Director::Event[SecureRandom.uuid, execution_plan_id, step_id, event, future, optional])
  @director.handle_event(event).each do |work_item|
    @work_items << work_item
  end
  future
end

#execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true) ⇒ Object



12
13
14
15
16
# File 'lib/dynflow/testing/in_thread_executor.rb', line 12

def execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true)
  feed_queue(@director.start_execution(execution_plan_id, finished))
  process_work_items
  finished
end

#feed_queue(work_items) ⇒ Object



57
58
59
60
61
62
# File 'lib/dynflow/testing/in_thread_executor.rb', line 57

def feed_queue(work_items)
  work_items.each do |work_item|
    work_item.world = @world
    @work_items.push(work_item)
  end
end

#handle_work(work_item) ⇒ Object



31
32
33
34
35
36
# File 'lib/dynflow/testing/in_thread_executor.rb', line 31

def handle_work(work_item)
  work_item.execute
  step = work_item.step if work_item.is_a?(Director::StepWorkItem)
  plan_events(step && step.delayed_events) if step && step.delayed_events
  @director.work_finished(work_item)
end

#plan_events(delayed_events) ⇒ Object



25
26
27
28
29
# File 'lib/dynflow/testing/in_thread_executor.rb', line 25

def plan_events(delayed_events)
  delayed_events.each do |event|
    @world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time)
  end
end

#process_work_itemsObject



18
19
20
21
22
23
# File 'lib/dynflow/testing/in_thread_executor.rb', line 18

def process_work_items
  until @work_items.empty?
    feed_queue(handle_work(@work_items.pop))
    clock_tick
  end
end

#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object



64
65
66
67
68
69
# File 'lib/dynflow/testing/in_thread_executor.rb', line 64

def terminate(future = Concurrent::Promises.resolvable_future)
  @director.terminate
  future.fulfill true
rescue => e
  future.reject e
end