Class: Phronomy::BlockingAdapterPool::PendingOperation

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/blocking_adapter_pool.rb

Overview

Represents the pending result of a submitted blocking operation. Returned immediately by #submit; call #await to wait for the result.

Instance Method Summary collapse

Constructor Details

#initialize(block, timeout: nil, cancellation_token: nil, on_abandoned: nil) ⇒ PendingOperation

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of PendingOperation.



201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/phronomy/blocking_adapter_pool.rb', line 201

def initialize(block, timeout: nil, cancellation_token: nil, on_abandoned: nil)
  @block = block
  @timeout = timeout
  @cancellation_token = cancellation_token
  @on_abandoned = on_abandoned
  @value = nil
  @error = nil
  @done = false
  @abandoned = false
  @wait_time = nil
  @submitted_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#abandoned?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns true when the operation was abandoned due to timeout.

Returns:

  • (Boolean)

    true when the operation was abandoned due to timeout



47
48
49
# File 'lib/phronomy/blocking_adapter_pool.rb', line 47

def abandoned?
  @abandoned
end

#await(timeout: nil, cancellation_token: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

Cooperative cancellation semantics (ADR-010): Phronomy uses a non-preemptive, cooperative-first concurrency model. Cancellation is cooperative, not preemptive:

  • When a +cancellation_token+ is cancelled, +CancellationError+ is raised to the +await+ caller immediately; when the timeout fires, +TimeoutError+ is raised instead. In both cases, the underlying worker thread is not forcibly stopped.
  • The worker thread will complete its submitted block naturally. Code inside the block must call +token.check!+ at suitable checkpoints to observe the cancelled state and exit early.
  • There is no +Thread#kill+ or +Thread#raise+ involved. The framework never forcibly terminates worker threads.
Note:

Cooperative timeout limitation: the +timeout:+ parameter passed to +await+ is not enforced on the cooperative path. The calling Fiber remains suspended until the worker thread finishes regardless of how many seconds elapse. This is because the cooperative scheduler cannot preempt a running OS thread. If a time bound is required, set +timeout:+ at submit time instead; the pool will then abandon the operation on the worker side and mark it as #abandoned?.

Blocks until the operation completes and returns its value.

An optional +timeout+ (in seconds) may be passed here; it is measured from the moment +await+ is called. If both a submit-time timeout and an await-time timeout are present, the earlier deadline wins. The worker thread is NOT interrupted — it runs to completion on its own.

An optional +cancellation_token+ may be passed here (or at submit time). If the token is cancelled while waiting, CancellationError is raised immediately without interrupting the worker.

Cooperative path (:fiber / DeterministicScheduler): When called from a Fiber managed by DeterministicScheduler (i.e. under the +:fiber+ runtime backend), the calling Fiber suspends cooperatively via +Fiber.yield+ rather than blocking the OS thread. The Fiber is resumed on the scheduler's ready queue once the worker thread completes the operation.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    seconds from now before raising TimeoutError (thread path only; ignored on the cooperative/fiber path)

  • cancellation_token (CancellationToken, nil) (defaults to: nil)

Returns:

  • (Object)

Raises:



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/phronomy/blocking_adapter_pool.rb', line 105

def await(timeout: nil, cancellation_token: nil)
  effective_timeout = [timeout, @timeout].compact.min
  effective_token = cancellation_token || @cancellation_token

  raise CancellationError, "blocking operation cancelled" if effective_token&.cancelled?

  # Cooperative context: suspend the calling Fiber rather than blocking
  # the OS thread so that DeterministicScheduler can continue dispatching
  # other tasks while waiting for the blocking worker to finish.
  # (Issue #338, ADR-010 Rule 3)
  # Uses the same thread-local key as Task::FiberBackend::SCHEDULER_KEY
  # (:phronomy_deterministic_scheduler) to avoid a cross-file constant
  # dependency at load time.
  scheduler = Thread.current.thread_variable_get(:phronomy_deterministic_scheduler)
  in_managed_fiber = !Fiber.respond_to?(:main) || Fiber.current != Fiber.main
  if scheduler && in_managed_fiber
    unless @done
      # Register this await with the scheduler so run_until_idle knows
      # not to exit until the worker thread completes (Issue #338).
      scheduler.track_blocking_await
      waiting_fiber = Fiber.current
      on_complete do |_result, _error|
        # Decrement the counter and wake run_until_idle, then re-enqueue
        # the suspended Fiber for cooperative resumption.
        scheduler.complete_blocking_await
        scheduler.enqueue_fiber(-> { waiting_fiber.resume })
      end
      Fiber.yield(:cooperative_suspend)
    end
    raise CancellationError, "blocking operation cancelled" if effective_token&.cancelled?
    raise @error if @error

    return @value
  end

  # Wake up the waiting thread whenever the token is cancelled so we can
  # propagate cancellation without sleeping until the timeout expires.
  effective_token&.on_cancel { @mutex.synchronize { @cond.broadcast } }

  if effective_timeout
    deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + effective_timeout
    @mutex.synchronize do
      until @done
        raise CancellationError, "blocking operation cancelled" if effective_token&.cancelled?

        remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
        if remaining <= 0
          # Guard against double-counting when await is called multiple times.
          unless @abandoned
            @abandoned = true
            @on_abandoned&.call
          end
          raise Phronomy::TimeoutError, "blocking operation timed out after #{effective_timeout}s"
        end
        @cond.wait(@mutex, remaining)
      end
    end
  else
    @mutex.synchronize do
      until @done
        raise CancellationError, "blocking operation cancelled" if effective_token&.cancelled?

        @cond.wait(@mutex)
      end
    end
  end
  raise @error if @error

  @value
end

#done?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns true when the operation has finished (success or error).

Returns:

  • (Boolean)

    true when the operation has finished (success or error)



41
42
43
# File 'lib/phronomy/blocking_adapter_pool.rb', line 41

def done?
  @mutex.synchronize { @done }
end

#execute!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/phronomy/blocking_adapter_pool.rb', line 217

def execute!
  @wait_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @submitted_at

  if @cancellation_token&.cancelled?
    complete_with_error!(CancellationError.new("operation cancelled before execution"))
    return
  end

  # Do NOT use Timeout.timeout here — it delivers an async Thread#raise
  # that can corrupt external library state (mutexes, C extensions, etc.).
  # Timeout enforcement is handled cooperatively in #await instead.
  # Each blocking library (Net::HTTP, pg, redis, etc.) should set its
  # own native connection/read timeouts.
  begin
    complete_with_value!(@block.call)
  rescue => e
    complete_with_error!(e)
  end
end

#on_complete {|result, error| ... } ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers a callback to be called when the operation finishes. If the operation has already finished the callback is invoked immediately on the calling thread. Otherwise it is invoked on the worker thread that completes the operation.

The callback receives +result+ and +error+ (one of them will be +nil+).

Yields:

  • (result, error)

Returns:

  • (self)


186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/phronomy/blocking_adapter_pool.rb', line 186

def on_complete(&callback)
  fire_args = nil
  @mutex.synchronize do
    if @done
      fire_args = [@value, @error]
    else
      @callbacks ||= []
      @callbacks << callback
    end
  end
  callback.call(*fire_args) if fire_args
  self
end

#wait_timeFloat

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns seconds spent in the queue before execution started.

Returns:

  • (Float)

    seconds spent in the queue before execution started



53
54
55
# File 'lib/phronomy/blocking_adapter_pool.rb', line 53

def wait_time
  @wait_time || 0.0
end