Class: Dommy::ReadableStream

Inherits:
Object
  • Object
show all
Defined in:
lib/dommy/streams.rb

Overview

‘ReadableStream` — implements the pull-from-source pattern via an `underlyingSource` whose `start` / `pull` callbacks receive a `Controller` to enqueue chunks. `getReader()` yields a `ReadableStreamDefaultReader` exposing `.read()` (Promise of `{ value:, done: }`).

Defined Under Namespace

Classes: Error

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(window, underlying_source = nil) ⇒ ReadableStream

Returns a new instance of ReadableStream.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/dommy/streams.rb', line 19

def initialize(window, underlying_source = nil)
  @window = window
  @queue = []
  @state = :readable
  @reader = nil
  @pending_reads = []
  @controller = ReadableStreamDefaultController.new(self)

  source = underlying_source.is_a?(Hash) ? underlying_source.transform_keys(&:to_s) : {}
  @pull_callback = source["pull"]

  start = source["start"]
  invoke(start, [@controller]) if start
end

Instance Attribute Details

#stateObject (readonly)

Returns the value of attribute state.



17
18
19
# File 'lib/dommy/streams.rb', line 17

def state
  @state
end

Instance Method Details

#__close__Object



70
71
72
73
# File 'lib/dommy/streams.rb', line 70

def __close__
  @state = :closed
  flush_pending_reads
end

#__drain__Object

Convenience: iterate all chunks synchronously into an Array.



54
55
56
57
58
59
60
61
# File 'lib/dommy/streams.rb', line 54

def __drain__
  out = []
  until @queue.empty?
    out << @queue.shift
  end

  out
end

#__enqueue__(chunk) ⇒ Object

Raises:



63
64
65
66
67
68
# File 'lib/dommy/streams.rb', line 63

def __enqueue__(chunk)
  raise Error, "stream is not readable" unless @state == :readable

  @queue << chunk
  flush_pending_reads
end

#__error__(reason) ⇒ Object



75
76
77
78
79
# File 'lib/dommy/streams.rb', line 75

def __error__(reason)
  @state = :errored
  @error_reason = reason
  flush_pending_reads
end

#__js_call__(method, args) ⇒ Object



105
106
107
108
109
110
111
112
# File 'lib/dommy/streams.rb', line 105

def __js_call__(method, args)
  case method
  when "getReader"
    get_reader
  when "cancel"
    cancel(args[0])
  end
end

#__js_get__(key) ⇒ Object



98
99
100
101
102
103
# File 'lib/dommy/streams.rb', line 98

def __js_get__(key)
  case key
  when "locked"
    locked
  end
end

#__read__Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/dommy/streams.rb', line 81

def __read__
  promise = PromiseValue.new(@window)

  if !@queue.empty?
    promise.fulfill({"value" => @queue.shift, "done" => false})
  elsif @state == :closed
    promise.fulfill({"value" => nil, "done" => true})
  elsif @state == :errored
    promise.reject(@error_reason)
  else
    @pending_reads << promise
    invoke(@pull_callback, [@controller]) if @pull_callback
  end

  promise
end

#cancel(_reason = nil) ⇒ Object



46
47
48
49
50
51
# File 'lib/dommy/streams.rb', line 46

def cancel(_reason = nil)
  @state = :closed
  @queue.clear
  flush_pending_reads
  PromiseValue.resolve(@window, nil)
end

#get_readerObject Also known as: getReader

Raises:



34
35
36
37
38
# File 'lib/dommy/streams.rb', line 34

def get_reader
  raise Error, "stream locked" if @reader

  @reader = ReadableStreamDefaultReader.new(self)
end

#lockedObject



42
43
44
# File 'lib/dommy/streams.rb', line 42

def locked
  !@reader.nil?
end