Class: Cloudflare::Queue
- Inherits:
-
Object
- Object
- Cloudflare::Queue
- Defined in:
- lib/homura/runtime/queue.rb
Overview
Producer wrapper. The binding’s JS API:
env.JOBS_QUEUE.send(body, options?) — one message
env.JOBS_QUEUE.sendBatch(messages, options?) — multiple
Each returns a JS Promise resolving to undefined.
Instance Attribute Summary collapse
-
#js ⇒ Object
readonly
Returns the value of attribute js.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #available? ⇒ Boolean
-
#initialize(js, name = nil) ⇒ Queue
constructor
A new instance of Queue.
-
#send(body, delay_seconds: nil, content_type: nil) ⇒ Object
Send one message.
-
#send_batch(messages, delay_seconds: nil) ⇒ Object
Send an Array of delay_seconds?, content_type? Hashes or plain bodies as a single batch.
Constructor Details
#initialize(js, name = nil) ⇒ Queue
Returns a new instance of Queue.
60 61 62 63 |
# File 'lib/homura/runtime/queue.rb', line 60 def initialize(js, name = nil) @js = js @name = (name || "queue").to_s end |
Instance Attribute Details
#js ⇒ Object (readonly)
Returns the value of attribute js.
58 59 60 |
# File 'lib/homura/runtime/queue.rb', line 58 def js @js end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
58 59 60 |
# File 'lib/homura/runtime/queue.rb', line 58 def name @name end |
Instance Method Details
#available? ⇒ Boolean
65 66 67 68 69 70 71 |
# File 'lib/homura/runtime/queue.rb', line 65 def available? js = @js # Opal's Ruby nil is a runtime sentinel (Opal.nil), not JS null. # See `lib/homura/runtime/cache.rb#available?` for the same # pattern and rationale. !!`(#{js} !== null && #{js} !== undefined && #{js} !== Opal.nil)` end |
#send(body, delay_seconds: nil, content_type: nil) ⇒ Object
Send one message. ‘body` may be any JSON-serialisable Ruby value. Strings / numbers / booleans pass through; Hashes / Arrays are sent as plain JS objects (Workers Queues natively encodes them via structured clone).
delay_seconds: 60 # schedule for ~1 minute from now
content_type: "json" (default) | "text" | "bytes"
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/homura/runtime/queue.rb', line 80 def send(body, delay_seconds: nil, content_type: nil) js = @js qname = @name err_klass = Cloudflare::QueueError unless available? raise QueueError.new( "queue binding not bound", operation: "send", queue: qname ) end js_body = ruby_to_js(body) js_opts = `({})` `#{js_opts}.delaySeconds = #{delay_seconds.to_i}` if delay_seconds `#{js_opts}.contentType = #{content_type.to_s}` if content_type # Single-line IIFE — see `lib/homura/runtime/cache.rb#put` # for the Opal multi-line x-string quirk. Passing arguments in # explicitly (rather than interpolating inside the template) # keeps the Promise a first-class expression. `(async function(js, body, opts, qname, Kernel, err_klass) { try { await js.send(body, opts); } catch (e) { Kernel.$raise(err_klass.$new(e && e.message ? e.message : String(e), Opal.hash({ operation: 'send', queue: qname }))); } return null; })(#{js}, #{js_body}, #{js_opts}, #{qname}, #{Kernel}, #{err_klass})` end |
#send_batch(messages, delay_seconds: nil) ⇒ Object
Send an Array of delay_seconds?, content_type? Hashes or plain bodies as a single batch.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/homura/runtime/queue.rb', line 106 def send_batch(, delay_seconds: nil) js = @js qname = @name err_klass = Cloudflare::QueueError unless available? raise QueueError.new( "queue binding not bound", operation: "send_batch", queue: qname ) end js_msgs = `([])` .each do |m| if m.is_a?(Hash) body = m["body"] || m[:body] ct = m["content_type"] || m[:content_type] ds = m["delay_seconds"] || m[:delay_seconds] js_body = ruby_to_js(body) js_msg = `({ body: #{js_body} })` `#{js_msg}.contentType = #{ct.to_s}` if ct `#{js_msg}.delaySeconds = #{ds.to_i}` if ds `#{js_msgs}.push(#{js_msg})` else js_body = ruby_to_js(m) `#{js_msgs}.push({ body: #{js_body} })` end end js_opts = `({})` `#{js_opts}.delaySeconds = #{delay_seconds.to_i}` if delay_seconds # Single-line IIFE — see `send` above for rationale. `(async function(js, msgs, opts, qname, Kernel, err_klass) { try { await js.sendBatch(msgs, opts); } catch (e) { Kernel.$raise(err_klass.$new(e && e.message ? e.message : String(e), Opal.hash({ operation: 'send_batch', queue: qname }))); } return null; })(#{js}, #{js_msgs}, #{js_opts}, #{qname}, #{Kernel}, #{err_klass})` end |