Class: FiberStream::RunningPipeline
- Inherits:
-
Object
- Object
- FiberStream::RunningPipeline
- Defined in:
- lib/fiber_stream/running_pipeline.rb
Instance Method Summary collapse
-
#cancel ⇒ Object
Requests cancellation of the background pipeline.
-
#cancel_requested? ⇒ Boolean
Returns true after ‘cancel` successfully records a cancellation request.
-
#done? ⇒ Boolean
Returns true when the background run has completed with success, failure, or cancellation.
-
#initialize(scheduler, &run) ⇒ RunningPipeline
constructor
A new instance of RunningPipeline.
-
#wait ⇒ Object
Waits for the background pipeline to complete.
Constructor Details
#initialize(scheduler, &run) ⇒ RunningPipeline
Returns a new instance of RunningPipeline.
5 6 7 8 9 10 11 12 13 |
# File 'lib/fiber_stream/running_pipeline.rb', line 5 def initialize(scheduler, &run) @scheduler = scheduler @completion = nil @waiters = [] @mutex = Mutex.new @cancel_requested = false @cancellation_error = nil @fiber = Fiber.schedule { run_background(run) } end |
Instance Method Details
#cancel ⇒ Object
Requests cancellation of the background pipeline.
Cancellation is cooperative and uses the scheduler captured when ‘Pipeline#run_async` started the background fiber. The method is idempotent. If the captured scheduler cannot interrupt fibers, this method raises `NotImplementedError` without recording a cancellation request.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/fiber_stream/running_pipeline.rb', line 47 def cancel fiber = nil cancellation_error = nil @mutex.synchronize do return self if @completion return self if @cancel_requested unless @scheduler.respond_to?(:fiber_interrupt) raise NotImplementedError, "scheduler does not support fiber_interrupt" end cancellation_error = PipelineCancelledError.new("pipeline cancelled") @cancellation_error = cancellation_error @cancel_requested = true fiber = @fiber end interrupt(fiber, cancellation_error) self end |
#cancel_requested? ⇒ Boolean
Returns true after ‘cancel` successfully records a cancellation request.
76 77 78 |
# File 'lib/fiber_stream/running_pipeline.rb', line 76 def cancel_requested? @mutex.synchronize { @cancel_requested } end |
#done? ⇒ Boolean
Returns true when the background run has completed with success, failure, or cancellation.
71 72 73 |
# File 'lib/fiber_stream/running_pipeline.rb', line 71 def done? @mutex.synchronize { !@completion.nil? } end |
#wait ⇒ Object
Waits for the background pipeline to complete.
On success, returns the sink materialized value. On stream failure, re-raises the original exception. If cancellation interrupts the background materialization, raises ‘PipelineCancelledError`. Waiting before completion requires a scheduler-backed non-blocking fiber; waiting after completion replays the stored result without requiring a scheduler.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/fiber_stream/running_pipeline.rb', line 22 def wait = nil waiter = nil @mutex.synchronize do if @completion = @completion else validate_scheduler!("RunningPipeline#wait") waiter = Thread::Queue.new @waiters << waiter end end ||= waiter.pop deliver() end |