Class: TIMEx::Composers::Hedged

Inherits:
Base
  • Object
show all
Defined in:
lib/timex/composers/hedged.rb

Overview

Note:

Losers are stopped with Thread#kill; the block must tolerate concurrent execution, partial side effects, and abrupt termination.

Launches up to max: staggered parallel attempts (after: seconds apart) of child and returns the first successful result.

See Also:

Instance Method Summary collapse

Methods included from NamedComponent

included

Constructor Details

#initialize(after:, child:, max: 2, idempotent: false) ⇒ Hedged

Returns a new instance of Hedged.

Parameters:

  • after (Numeric)

    seconds between successive attempt launches

  • child (Symbol, Strategies::Base)

    strategy used per attempt

  • max (Integer) (defaults to: 2)

    maximum concurrent attempts (>= 1)

  • idempotent (Boolean) (defaults to: false)

    must be true; acknowledges concurrent/kill semantics

Raises:

  • (ArgumentError)

    when parameters are invalid



19
20
21
22
23
24
25
26
27
28
# File 'lib/timex/composers/hedged.rb', line 19

def initialize(after:, child:, max: 2, idempotent: false)
  super()
  raise ArgumentError, "Hedged requires idempotent: true" unless idempotent
  raise ArgumentError, "after must be a non-negative Numeric" unless after.is_a?(Numeric) && !after.negative?
  raise ArgumentError, "max must be >= 1" if max < 1

  @after = after
  @max = max
  @child = Registry.resolve(child)
end

Instance Method Details

#call(deadline:, on_timeout: :raise, **opts) {|deadline| ... } ⇒ Object

Returns first successful value or handler result.

Parameters:

  • deadline (Deadline, Numeric, Time, nil)
  • on_timeout (Symbol, Proc) (defaults to: :raise)
  • opts (Hash{Symbol => Object})

    forwarded to each child attempt

Yield Parameters:

Returns:

  • (Object)

    first successful value or handler result

Raises:

  • (StandardError)

    re-raised from a failed attempt when no success precedes it



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/timex/composers/hedged.rb', line 36

def call(deadline:, on_timeout: :raise, **opts, &block)
  deadline = Deadline.coerce(deadline)
  results = Queue.new
  # Side-channel of "a result just landed" notifications so the spawn
  # loop can block on `signal.pop(timeout:)` instead of polling — we
  # can't peek at `results` non-destructively without disturbing the
  # consumption order of `await_outcome`.
  signal = Queue.new
  threads = []

  threads << launch(deadline, results, signal, opts, &block)
  until !results.empty? || (threads.size >= @max) || deadline.expired?
    status = wait_for_result(@after, signal, deadline)
    break if status == :result_ready
    # `wait_for_result` returns `:expired` when the parent deadline
    # elapsed during the wait. Bail before launching a redundant
    # worker that would race against the expiration.
    break if status == :expired

    threads << launch(deadline, results, signal, opts, &block)
  end

  outcome = await_outcome(results, threads.size, deadline)
  threads.each { |t| t.kill if t.alive? }

  case outcome[0]
  when :ok then outcome[1]
  when :error then raise outcome[1]
  when :timeout
    handle_timeout(
      on_timeout,
      deadline.expired_error(
        strategy: :hedged,
        message: "all hedged attempts timed out"
      )
    )
  end
end