Class: FiberStream::RunningPipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_stream/running_pipeline.rb

Instance Method Summary collapse

Constructor Details

#initialize(scheduler, &run) ⇒ RunningPipeline

Returns a new instance of RunningPipeline.



5
6
7
8
9
10
11
12
13
# File 'lib/fiber_stream/running_pipeline.rb', line 5

def initialize(scheduler, &run)
  @scheduler = scheduler
  @completion = nil
  @waiters = []
  @mutex = Mutex.new
  @cancel_requested = false
  @cancellation_error = nil
  @fiber = Fiber.schedule { run_background(run) }
end

Instance Method Details

#cancelObject

Requests cancellation of the background pipeline.

Cancellation is cooperative and uses the scheduler captured when ‘Pipeline#run_async` started the background fiber. The method is idempotent. If the captured scheduler cannot interrupt fibers, this method raises `NotImplementedError` without recording a cancellation request.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fiber_stream/running_pipeline.rb', line 47

def cancel
  fiber = nil
  cancellation_error = nil

  @mutex.synchronize do
    return self if @completion
    return self if @cancel_requested

    unless @scheduler.respond_to?(:fiber_interrupt)
      raise NotImplementedError, "scheduler does not support fiber_interrupt"
    end

    cancellation_error = PipelineCancelledError.new("pipeline cancelled")
    @cancellation_error = cancellation_error
    @cancel_requested = true
    fiber = @fiber
  end

  interrupt(fiber, cancellation_error)
  self
end

#cancel_requested?Boolean

Returns true after ‘cancel` successfully records a cancellation request.

Returns:

  • (Boolean)


76
77
78
# File 'lib/fiber_stream/running_pipeline.rb', line 76

def cancel_requested?
  @mutex.synchronize { @cancel_requested }
end

#done?Boolean

Returns true when the background run has completed with success, failure, or cancellation.

Returns:

  • (Boolean)


71
72
73
# File 'lib/fiber_stream/running_pipeline.rb', line 71

def done?
  @mutex.synchronize { !@completion.nil? }
end

#waitObject

Waits for the background pipeline to complete.

On success, returns the sink materialized value. On stream failure, re-raises the original exception. If cancellation interrupts the background materialization, raises ‘PipelineCancelledError`. Waiting before completion requires a scheduler-backed non-blocking fiber; waiting after completion replays the stored result without requiring a scheduler.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/fiber_stream/running_pipeline.rb', line 22

def wait
  message = nil
  waiter = nil

  @mutex.synchronize do
    if @completion
      message = @completion
    else
      validate_scheduler!("RunningPipeline#wait")
      waiter = Thread::Queue.new
      @waiters << waiter
    end
  end

  message ||= waiter.pop
  deliver(message)
end