Module: Cloudflare::QueueConsumer
- Defined in:
- lib/cloudflare_workers/queue.rb
Overview
Consumer dispatcher. The Sinatra DSL (‘consume_queue`) registers handlers here via `Cloudflare::QueueConsumer.register(queue_name, unbound_method)`. `src/worker.mjs#queue` calls `globalThis.HOMURA_QUEUE_DISPATCH` which forwards into `dispatch_js` below.
Class Attribute Summary collapse
-
.handlers ⇒ Object
readonly
Returns the value of attribute handlers.
Class Method Summary collapse
-
.dispatch_js(js_batch, js_env, js_ctx) ⇒ Object
Dispatcher called from the JS hook.
- .handler_for(queue_name) ⇒ Object
- .handlers_by_queue ⇒ Object
-
.install_dispatcher ⇒ Object
Single-line backtick (see scheduled.rb for the Opal multi-line constraint).
- .register(queue_name, unbound_method) ⇒ Object
Class Attribute Details
.handlers ⇒ Object (readonly)
Returns the value of attribute handlers.
311 312 313 |
# File 'lib/cloudflare_workers/queue.rb', line 311 def handlers @handlers end |
Class Method Details
.dispatch_js(js_batch, js_env, js_ctx) ⇒ Object
Dispatcher called from the JS hook. ‘js_batch` is the MessageBatch, `js_env`, `js_ctx` are the Workers env and ExecutionContext. Returns a Hash summary ({ queue:, handled:, size: }) for diagnostics.
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/cloudflare_workers/queue.rb', line 330 def self.dispatch_js(js_batch, js_env, js_ctx) batch = QueueBatch.new(js_batch) queue_name = batch.queue handler = handler_for(queue_name) if handler.nil? warn "[Cloudflare::QueueConsumer] no handler registered for queue #{queue_name.inspect}; messages will time out and retry" return { 'queue' => queue_name, 'handled' => false, 'size' => batch.size, 'reason' => 'no_handler' } end ctx = QueueContext.new(batch, js_env, js_ctx) result = handler.bind(ctx).call(batch) if `(#{result} != null && typeof #{result}.then === 'function')` result = result.__await__ end { 'queue' => queue_name, 'handled' => true, 'size' => batch.size, 'result' => result.is_a?(Hash) ? result : nil } end |
.handler_for(queue_name) ⇒ Object
319 320 321 |
# File 'lib/cloudflare_workers/queue.rb', line 319 def self.handler_for(queue_name) (@handlers || {})[queue_name.to_s] end |
.handlers_by_queue ⇒ Object
323 324 325 |
# File 'lib/cloudflare_workers/queue.rb', line 323 def self.handlers_by_queue (@handlers || {}).dup end |
.install_dispatcher ⇒ Object
Single-line backtick (see scheduled.rb for the Opal multi-line constraint). Logs+swallows thrown errors so one bad handler doesn’t crash the Workers queue consumer.
350 351 352 353 |
# File 'lib/cloudflare_workers/queue.rb', line 350 def self.install_dispatcher mod = self `globalThis.__HOMURA_QUEUE_DISPATCH__ = async function(js_batch, js_env, js_ctx) { try { return await #{mod}.$dispatch_js(js_batch, js_env, js_ctx); } catch (err) { try { globalThis.console.error('[Cloudflare::QueueConsumer] dispatch failed:', err && err.stack || err); } catch (e) {} return { queue: (js_batch && js_batch.queue) || '', handled: false, size: (js_batch && Array.isArray(js_batch.messages) ? js_batch.messages.length : 0), error: String(err && err.message || err) }; } };(function(){var g=globalThis;g.__OPAL_WORKERS__=g.__OPAL_WORKERS__||{};g.__OPAL_WORKERS__.queue=g.__HOMURA_QUEUE_DISPATCH__;})();` end |
.register(queue_name, unbound_method) ⇒ Object
314 315 316 317 |
# File 'lib/cloudflare_workers/queue.rb', line 314 def self.register(queue_name, unbound_method) @handlers ||= {} @handlers[queue_name.to_s] = unbound_method end |