Class: Cloudflare::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/homura/runtime/queue.rb

Overview

Producer wrapper. The binding’s JS API:

env.JOBS_QUEUE.send(body, options?)            — one message
env.JOBS_QUEUE.sendBatch(messages, options?)   — multiple

Each returns a JS Promise resolving to undefined.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(js, name = nil) ⇒ Queue

Returns a new instance of Queue.



60
61
62
63
# File 'lib/homura/runtime/queue.rb', line 60

def initialize(js, name = nil)
  @js = js
  @name = (name || "queue").to_s
end

Instance Attribute Details

#jsObject (readonly)

Returns the value of attribute js.



58
59
60
# File 'lib/homura/runtime/queue.rb', line 58

def js
  @js
end

#nameObject (readonly)

Returns the value of attribute name.



58
59
60
# File 'lib/homura/runtime/queue.rb', line 58

def name
  @name
end

Instance Method Details

#available?Boolean

Returns:

  • (Boolean)


65
66
67
68
69
70
71
# File 'lib/homura/runtime/queue.rb', line 65

def available?
  js = @js
  # Opal's Ruby nil is a runtime sentinel (Opal.nil), not JS null.
  # See `lib/homura/runtime/cache.rb#available?` for the same
  # pattern and rationale.
  !!`(#{js} !== null && #{js} !== undefined && #{js} !== Opal.nil)`
end

#send(body, delay_seconds: nil, content_type: nil) ⇒ Object

Send one message. ‘body` may be any JSON-serialisable Ruby value. Strings / numbers / booleans pass through; Hashes / Arrays are sent as plain JS objects (Workers Queues natively encodes them via structured clone).

delay_seconds: 60   # schedule for ~1 minute from now
content_type: "json" (default) | "text" | "bytes"


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/homura/runtime/queue.rb', line 80

def send(body, delay_seconds: nil, content_type: nil)
  js = @js
  qname = @name
  err_klass = Cloudflare::QueueError
  unless available?
    raise QueueError.new(
            "queue binding not bound",
            operation: "send",
            queue: qname
          )
  end

  js_body = ruby_to_js(body)
  js_opts = `({})`
  `#{js_opts}.delaySeconds = #{delay_seconds.to_i}` if delay_seconds
  `#{js_opts}.contentType  = #{content_type.to_s}` if content_type

  # Single-line IIFE — see `lib/homura/runtime/cache.rb#put`
  # for the Opal multi-line x-string quirk. Passing arguments in
  # explicitly (rather than interpolating inside the template)
  # keeps the Promise a first-class expression.
  `(async function(js, body, opts, qname, Kernel, err_klass) { try { await js.send(body, opts); } catch (e) { Kernel.$raise(err_klass.$new(e && e.message ? e.message : String(e), Opal.hash({ operation: 'send', queue: qname }))); } return null; })(#{js}, #{js_body}, #{js_opts}, #{qname}, #{Kernel}, #{err_klass})`
end

#send_batch(messages, delay_seconds: nil) ⇒ Object

Send an Array of delay_seconds?, content_type? Hashes or plain bodies as a single batch.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/homura/runtime/queue.rb', line 106

def send_batch(messages, delay_seconds: nil)
  js = @js
  qname = @name
  err_klass = Cloudflare::QueueError
  unless available?
    raise QueueError.new(
            "queue binding not bound",
            operation: "send_batch",
            queue: qname
          )
  end

  js_msgs = `([])`
  messages.each do |m|
    if m.is_a?(Hash)
      body = m["body"] || m[:body]
      ct = m["content_type"] || m[:content_type]
      ds = m["delay_seconds"] || m[:delay_seconds]
      js_body = ruby_to_js(body)
      js_msg = `({ body: #{js_body} })`
      `#{js_msg}.contentType  = #{ct.to_s}` if ct
      `#{js_msg}.delaySeconds = #{ds.to_i}` if ds
      `#{js_msgs}.push(#{js_msg})`
    else
      js_body = ruby_to_js(m)
      `#{js_msgs}.push({ body: #{js_body} })`
    end
  end
  js_opts = `({})`
  `#{js_opts}.delaySeconds = #{delay_seconds.to_i}` if delay_seconds

  # Single-line IIFE — see `send` above for rationale.
  `(async function(js, msgs, opts, qname, Kernel, err_klass) { try { await js.sendBatch(msgs, opts); } catch (e) { Kernel.$raise(err_klass.$new(e && e.message ? e.message : String(e), Opal.hash({ operation: 'send_batch', queue: qname }))); } return null; })(#{js}, #{js_msgs}, #{js_opts}, #{qname}, #{Kernel}, #{err_klass})`
end