Class: Cloudflare::Queue
- Inherits:
-
Object
- Object
- Cloudflare::Queue
- Defined in:
- lib/cloudflare_workers/queue.rb
Overview
Producer wrapper. The binding’s JS API:
env.JOBS_QUEUE.send(body, options?) — one message
env.JOBS_QUEUE.sendBatch(messages, options?) — multiple
Each returns a JS Promise resolving to undefined.
Instance Attribute Summary collapse
-
#js ⇒ Object
readonly
Returns the value of attribute js.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #available? ⇒ Boolean
-
#initialize(js, name = nil) ⇒ Queue
constructor
A new instance of Queue.
-
#send(body, delay_seconds: nil, content_type: nil) ⇒ Object
Send one message.
-
#send_batch(messages, delay_seconds: nil) ⇒ Object
Send an Array of delay_seconds?, content_type? Hashes or plain bodies as a single batch.
Constructor Details
#initialize(js, name = nil) ⇒ Queue
Returns a new instance of Queue.
58 59 60 61 |
# File 'lib/cloudflare_workers/queue.rb', line 58 def initialize(js, name = nil) @js = js @name = (name || 'queue').to_s end |
Instance Attribute Details
#js ⇒ Object (readonly)
Returns the value of attribute js.
56 57 58 |
# File 'lib/cloudflare_workers/queue.rb', line 56 def js @js end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
56 57 58 |
# File 'lib/cloudflare_workers/queue.rb', line 56 def name @name end |
Instance Method Details
#available? ⇒ Boolean
63 64 65 66 67 68 69 |
# File 'lib/cloudflare_workers/queue.rb', line 63 def available? js = @js # Opal's Ruby nil is a runtime sentinel (Opal.nil), not JS null. # See `lib/cloudflare_workers/cache.rb#available?` for the same # pattern and rationale. !!`(#{js} !== null && #{js} !== undefined && #{js} !== Opal.nil)` end |
#send(body, delay_seconds: nil, content_type: nil) ⇒ Object
Send one message. ‘body` may be any JSON-serialisable Ruby value. Strings / numbers / booleans pass through; Hashes / Arrays are sent as plain JS objects (Workers Queues natively encodes them via structured clone).
delay_seconds: 60 # schedule for ~1 minute from now
content_type: "json" (default) | "text" | "bytes"
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/cloudflare_workers/queue.rb', line 78 def send(body, delay_seconds: nil, content_type: nil) js = @js qname = @name err_klass = Cloudflare::QueueError raise QueueError.new('queue binding not bound', operation: 'send', queue: qname) unless available? js_body = ruby_to_js(body) js_opts = `({})` `#{js_opts}.delaySeconds = #{delay_seconds.to_i}` if delay_seconds `#{js_opts}.contentType = #{content_type.to_s}` if content_type # Single-line IIFE — see `lib/cloudflare_workers/cache.rb#put` # for the Opal multi-line x-string quirk. Passing arguments in # explicitly (rather than interpolating inside the template) # keeps the Promise a first-class expression. `(async function(js, body, opts, qname, Kernel, err_klass) { try { await js.send(body, opts); } catch (e) { Kernel.$raise(err_klass.$new(e && e.message ? e.message : String(e), Opal.hash({ operation: 'send', queue: qname }))); } return null; })(#{js}, #{js_body}, #{js_opts}, #{qname}, #{Kernel}, #{err_klass})` end |
#send_batch(messages, delay_seconds: nil) ⇒ Object
Send an Array of delay_seconds?, content_type? Hashes or plain bodies as a single batch.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/cloudflare_workers/queue.rb', line 98 def send_batch(, delay_seconds: nil) js = @js qname = @name err_klass = Cloudflare::QueueError raise QueueError.new('queue binding not bound', operation: 'send_batch', queue: qname) unless available? js_msgs = `([])` .each do |m| if m.is_a?(Hash) body = m['body'] || m[:body] ct = m['content_type'] || m[:content_type] ds = m['delay_seconds'] || m[:delay_seconds] js_body = ruby_to_js(body) js_msg = `({ body: #{js_body} })` `#{js_msg}.contentType = #{ct.to_s}` if ct `#{js_msg}.delaySeconds = #{ds.to_i}` if ds `#{js_msgs}.push(#{js_msg})` else js_body = ruby_to_js(m) `#{js_msgs}.push({ body: #{js_body} })` end end js_opts = `({})` `#{js_opts}.delaySeconds = #{delay_seconds.to_i}` if delay_seconds # Single-line IIFE — see `send` above for rationale. `(async function(js, msgs, opts, qname, Kernel, err_klass) { try { await js.sendBatch(msgs, opts); } catch (e) { Kernel.$raise(err_klass.$new(e && e.message ? e.message : String(e), Opal.hash({ operation: 'send_batch', queue: qname }))); } return null; })(#{js}, #{js_msgs}, #{js_opts}, #{qname}, #{Kernel}, #{err_klass})` end |