Module: Cloudflare::QueueConsumer
- Defined in:
- lib/homura/runtime/queue.rb
Overview
Consumer dispatcher. The Sinatra DSL (‘consume_queue`) registers handlers here via `Cloudflare::QueueConsumer.register(queue_name, unbound_method)`. `src/worker.mjs#queue` calls `globalThis.HOMURA_QUEUE_DISPATCH` which forwards into `dispatch_js` below.
Class Attribute Summary collapse
-
.handlers ⇒ Object
readonly
Returns the value of attribute handlers.
Class Method Summary collapse
-
.dispatch_js(js_batch, js_env, js_ctx) ⇒ Object
Dispatcher called from the JS hook.
- .handler_for(queue_name) ⇒ Object
- .handlers_by_queue ⇒ Object
-
.install_dispatcher ⇒ Object
Single-line backtick (see scheduled.rb for the Opal multi-line constraint).
- .register(queue_name, unbound_method) ⇒ Object
Class Attribute Details
.handlers ⇒ Object (readonly)
Returns the value of attribute handlers.
332 333 334 |
# File 'lib/homura/runtime/queue.rb', line 332 def handlers @handlers end |
Class Method Details
.dispatch_js(js_batch, js_env, js_ctx) ⇒ Object
Dispatcher called from the JS hook. ‘js_batch` is the MessageBatch, `js_env`, `js_ctx` are the Workers env and ExecutionContext. Returns a Hash summary ({ queue:, handled:, size: }) for diagnostics.
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/homura/runtime/queue.rb', line 351 def self.dispatch_js(js_batch, js_env, js_ctx) batch = QueueBatch.new(js_batch) queue_name = batch.queue handler = handler_for(queue_name) if handler.nil? warn "[Cloudflare::QueueConsumer] no handler registered for queue #{queue_name.inspect}; messages will time out and retry" return( { "queue" => queue_name, "handled" => false, "size" => batch.size, "reason" => "no_handler" } ) end ctx = QueueContext.new(batch, js_env, js_ctx) result = handler.bind(ctx).call(batch) if `(#{result} != null && typeof #{result}.then === 'function')` result = result.__await__ end { "queue" => queue_name, "handled" => true, "size" => batch.size, "result" => result.is_a?(Hash) ? result : nil } end |
.handler_for(queue_name) ⇒ Object
340 341 342 |
# File 'lib/homura/runtime/queue.rb', line 340 def self.handler_for(queue_name) (@handlers || {})[queue_name.to_s] end |
.handlers_by_queue ⇒ Object
344 345 346 |
# File 'lib/homura/runtime/queue.rb', line 344 def self.handlers_by_queue (@handlers || {}).dup end |
.install_dispatcher ⇒ Object
Single-line backtick (see scheduled.rb for the Opal multi-line constraint). Logs+swallows thrown errors so one bad handler doesn’t crash the Workers queue consumer.
383 384 385 386 |
# File 'lib/homura/runtime/queue.rb', line 383 def self.install_dispatcher mod = self `globalThis.__HOMURA_QUEUE_DISPATCH__ = async function(js_batch, js_env, js_ctx) { try { return await #{mod}.$dispatch_js(js_batch, js_env, js_ctx); } catch (err) { try { globalThis.console.error('[Cloudflare::QueueConsumer] dispatch failed:', err && err.stack || err); } catch (e) {} return { queue: (js_batch && js_batch.queue) || '', handled: false, size: (js_batch && Array.isArray(js_batch.messages) ? js_batch.messages.length : 0), error: String(err && err.message || err) }; } };(function(){var g=globalThis;g.__OPAL_WORKERS__=g.__OPAL_WORKERS__||{};g.__OPAL_WORKERS__.queue=g.__HOMURA_QUEUE_DISPATCH__;})();` end |
.register(queue_name, unbound_method) ⇒ Object
335 336 337 338 |
# File 'lib/homura/runtime/queue.rb', line 335 def self.register(queue_name, unbound_method) @handlers ||= {} @handlers[queue_name.to_s] = unbound_method end |