Class: TIMEx::Composers::Hedged
- Defined in:
- lib/timex/composers/hedged.rb
Overview
Note:
Losers are stopped with Thread#kill; the block must tolerate concurrent execution, partial side effects, and abrupt termination.
Launches up to max: staggered parallel attempts (after: seconds apart) of child and returns the first successful result.
Instance Method Summary collapse
-
#call(deadline:, on_timeout: :raise, **opts) {|deadline| ... } ⇒ Object
First successful value or handler result.
-
#initialize(after:, child:, max: 2, idempotent: false) ⇒ Hedged
constructor
A new instance of Hedged.
Methods included from NamedComponent
Constructor Details
#initialize(after:, child:, max: 2, idempotent: false) ⇒ Hedged
Returns a new instance of Hedged.
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/timex/composers/hedged.rb', line 19 def initialize(after:, child:, max: 2, idempotent: false) super() raise ArgumentError, "Hedged requires idempotent: true" unless idempotent raise ArgumentError, "after must be a non-negative Numeric" unless after.is_a?(Numeric) && !after.negative? raise ArgumentError, "max must be >= 1" if max < 1 @after = after @max = max @child = Registry.resolve(child) end |
Instance Method Details
#call(deadline:, on_timeout: :raise, **opts) {|deadline| ... } ⇒ Object
Returns first successful value or handler result.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/timex/composers/hedged.rb', line 36 def call(deadline:, on_timeout: :raise, **opts, &block) deadline = Deadline.coerce(deadline) results = Queue.new # Side-channel of "a result just landed" notifications so the spawn # loop can block on `signal.pop(timeout:)` instead of polling — we # can't peek at `results` non-destructively without disturbing the # consumption order of `await_outcome`. signal = Queue.new threads = [] threads << launch(deadline, results, signal, opts, &block) until !results.empty? || (threads.size >= @max) || deadline.expired? status = wait_for_result(@after, signal, deadline) break if status == :result_ready # `wait_for_result` returns `:expired` when the parent deadline # elapsed during the wait. Bail before launching a redundant # worker that would race against the expiration. break if status == :expired threads << launch(deadline, results, signal, opts, &block) end outcome = await_outcome(results, threads.size, deadline) threads.each { |t| t.kill if t.alive? } case outcome[0] when :ok then outcome[1] when :error then raise outcome[1] when :timeout handle_timeout( on_timeout, deadline.expired_error( strategy: :hedged, message: "all hedged attempts timed out" ) ) end end |