Class: TIMEx::Composers::Adaptive

Inherits:
Base
  • Object
show all
Defined in:
lib/timex/composers/adaptive.rb

Overview

Chooses a per-call child deadline from a latency estimator, then delegates to child with on_timeout: :raise so timeouts feed back into the estimator uniformly before applying the caller’s on_timeout:.

See Also:

Defined Under Namespace

Classes: InMemoryStore

Instance Method Summary collapse

Methods included from NamedComponent

included

Constructor Details

#initialize(child:, history: InMemoryStore.new, multiplier: 1.5, floor_ms: 25, ceiling_ms: 30_000) ⇒ Adaptive

Returns a new instance of Adaptive.

Parameters:

  • child (Symbol, Strategies::Base)

    inner strategy

  • history (#estimate_ms, #record) (defaults to: InMemoryStore.new)

    latency store (defaults to InMemoryStore)

  • multiplier (Numeric) (defaults to: 1.5)

    scales the estimate into a budget

  • floor_ms (Numeric) (defaults to: 25)

    minimum adaptive budget

  • ceiling_ms (Numeric) (defaults to: 30_000)

    maximum adaptive budget

Raises:

  • (ArgumentError)

    when parameters are invalid



150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/timex/composers/adaptive.rb', line 150

def initialize(child:, history: InMemoryStore.new, multiplier: 1.5, floor_ms: 25, ceiling_ms: 30_000)
  super()
  raise ArgumentError, "multiplier must be > 0" unless multiplier.is_a?(Numeric) && multiplier.positive?
  raise ArgumentError, "floor_ms must be a positive Numeric" unless floor_ms.is_a?(Numeric) && floor_ms.positive?
  raise ArgumentError, "ceiling_ms must be >= floor_ms" unless ceiling_ms.is_a?(Numeric) && ceiling_ms >= floor_ms

  @child = Registry.resolve(child)
  @history = history
  @multiplier = multiplier
  @floor_ms = floor_ms
  @ceiling_ms = ceiling_ms
end

Instance Method Details

#call(deadline: nil, on_timeout: :raise, **opts) {|deadline| ... } ⇒ Object

Returns child return or timeout handler result.

Parameters:

  • deadline (Deadline, Numeric, Time, nil, Object) (defaults to: nil)

    optional outer cap (min with adaptive budget)

  • on_timeout (Symbol, Proc) (defaults to: :raise)

    applied after child raises Expired

  • opts (Hash{Symbol => Object})

    forwarded to child

Yield Parameters:

Returns:

  • (Object)

    child return or timeout handler result

Raises:

  • (StandardError)

    non-timeout errors from the child propagate after recording latency



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/timex/composers/adaptive.rb', line 169

def call(deadline: nil, on_timeout: :raise, **opts, &block)
  estimate  = @history.estimate_ms
  budget_ms = if estimate
                (estimate * @multiplier).clamp(@floor_ms, @ceiling_ms)
              else
                @ceiling_ms
              end

  adaptive_deadline = Deadline.in(budget_ms / 1000.0)
  effective         = deadline ? Deadline.coerce(deadline).min(adaptive_deadline) : adaptive_deadline

  TIMEx::Telemetry.instrument(
    event: "composer.adaptive",
    estimate_ms: estimate&.round,
    budget_ms: budget_ms.round,
    deadline_ms: effective.infinite? ? nil : effective.remaining_ms.round
  ) do |payload|
    started = Clock.monotonic_ns
    begin
      # Force the child to surface `Expired` so we can record a uniform
      # timeout penalty regardless of the caller's `on_timeout:` (a
      # `:return_nil`/`:result` path would otherwise be recorded as a
      # success at ~budget_ms and never penalize the estimator). We
      # re-apply the caller's `on_timeout:` ourselves.
      value = @child.call(deadline: effective, on_timeout: :raise, **opts, &block)
      @history.record((Clock.monotonic_ns - started) / 1_000_000.0)
      value
    rescue Expired => e
      payload[:outcome] = :timeout
      # Record the *budget* as the penalty (capped at ceiling), not the
      # multiplied estimate. Previously we recorded the parent-clamped
      # budget_ms back into history, which on a tight parent deadline
      # could differ from what we actually waited and bias the estimator.
      # Use `effective.remaining_ms` (post-clamp elapsed) if available so
      # the estimator tracks real wait time, falling back to budget_ms.
      elapsed_ms = (Clock.monotonic_ns - started) / 1_000_000.0
      @history.record([elapsed_ms, budget_ms.to_f].max.clamp(@floor_ms, @ceiling_ms))
      handle_timeout(on_timeout, e)
    rescue StandardError
      # User-cancelled or otherwise-failed attempts should still feed
      # the estimator: a child that consistently raises after ~budget_ms
      # tells us latency is rising, even if the caller is the one
      # throwing the exception. Cap the recorded sample at the ceiling
      # so a slow upstream-of-failure doesn't pin the estimator high.
      elapsed_ms = (Clock.monotonic_ns - started) / 1_000_000.0
      @history.record(elapsed_ms.clamp(@floor_ms, @ceiling_ms))
      raise
    end
  end
end