Class: Cloudflare::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudflare_workers/queue.rb

Overview

Producer wrapper. The binding’s JS API:

env.JOBS_QUEUE.send(body, options?)            — one message
env.JOBS_QUEUE.sendBatch(messages, options?)   — multiple

Each returns a JS Promise resolving to undefined.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(js, name = nil) ⇒ Queue

Returns a new instance of Queue.



58
59
60
61
# File 'lib/cloudflare_workers/queue.rb', line 58

def initialize(js, name = nil)
  @js = js
  @name = (name || 'queue').to_s
end

Instance Attribute Details

#jsObject (readonly)

Returns the value of attribute js.



56
57
58
# File 'lib/cloudflare_workers/queue.rb', line 56

def js
  @js
end

#nameObject (readonly)

Returns the value of attribute name.



56
57
58
# File 'lib/cloudflare_workers/queue.rb', line 56

def name
  @name
end

Instance Method Details

#available?Boolean

Returns:

  • (Boolean)


63
64
65
66
67
68
69
# File 'lib/cloudflare_workers/queue.rb', line 63

def available?
  js = @js
  # Opal's Ruby nil is a runtime sentinel (Opal.nil), not JS null.
  # See `lib/cloudflare_workers/cache.rb#available?` for the same
  # pattern and rationale.
  !!`(#{js} !== null && #{js} !== undefined && #{js} !== Opal.nil)`
end

#send(body, delay_seconds: nil, content_type: nil) ⇒ Object

Send one message. ‘body` may be any JSON-serialisable Ruby value. Strings / numbers / booleans pass through; Hashes / Arrays are sent as plain JS objects (Workers Queues natively encodes them via structured clone).

delay_seconds: 60   # schedule for ~1 minute from now
content_type: "json" (default) | "text" | "bytes"

Raises:



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/cloudflare_workers/queue.rb', line 78

def send(body, delay_seconds: nil, content_type: nil)
  js = @js
  qname = @name
  err_klass = Cloudflare::QueueError
  raise QueueError.new('queue binding not bound', operation: 'send', queue: qname) unless available?

  js_body = ruby_to_js(body)
  js_opts = `({})`
  `#{js_opts}.delaySeconds = #{delay_seconds.to_i}` if delay_seconds
  `#{js_opts}.contentType  = #{content_type.to_s}`  if content_type

  # Single-line IIFE — see `lib/cloudflare_workers/cache.rb#put`
  # for the Opal multi-line x-string quirk. Passing arguments in
  # explicitly (rather than interpolating inside the template)
  # keeps the Promise a first-class expression.
  `(async function(js, body, opts, qname, Kernel, err_klass) { try { await js.send(body, opts); } catch (e) { Kernel.$raise(err_klass.$new(e && e.message ? e.message : String(e), Opal.hash({ operation: 'send', queue: qname }))); } return null; })(#{js}, #{js_body}, #{js_opts}, #{qname}, #{Kernel}, #{err_klass})`
end

#send_batch(messages, delay_seconds: nil) ⇒ Object

Send an Array of delay_seconds?, content_type? Hashes or plain bodies as a single batch.

Raises:



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/cloudflare_workers/queue.rb', line 98

def send_batch(messages, delay_seconds: nil)
  js = @js
  qname = @name
  err_klass = Cloudflare::QueueError
  raise QueueError.new('queue binding not bound', operation: 'send_batch', queue: qname) unless available?

  js_msgs = `([])`
  messages.each do |m|
    if m.is_a?(Hash)
      body = m['body'] || m[:body]
      ct   = m['content_type'] || m[:content_type]
      ds   = m['delay_seconds'] || m[:delay_seconds]
      js_body = ruby_to_js(body)
      js_msg = `({ body: #{js_body} })`
      `#{js_msg}.contentType  = #{ct.to_s}` if ct
      `#{js_msg}.delaySeconds = #{ds.to_i}` if ds
      `#{js_msgs}.push(#{js_msg})`
    else
      js_body = ruby_to_js(m)
      `#{js_msgs}.push({ body: #{js_body} })`
    end
  end
  js_opts = `({})`
  `#{js_opts}.delaySeconds = #{delay_seconds.to_i}` if delay_seconds

  # Single-line IIFE — see `send` above for rationale.
  `(async function(js, msgs, opts, qname, Kernel, err_klass) { try { await js.sendBatch(msgs, opts); } catch (e) { Kernel.$raise(err_klass.$new(e && e.message ? e.message : String(e), Opal.hash({ operation: 'send_batch', queue: qname }))); } return null; })(#{js}, #{js_msgs}, #{js_opts}, #{qname}, #{Kernel}, #{err_klass})`
end