Module: Cloudflare::QueueConsumer

Defined in:
lib/cloudflare_workers/queue.rb

Overview

Consumer dispatcher. The Sinatra DSL (‘consume_queue`) registers handlers here via `Cloudflare::QueueConsumer.register(queue_name, unbound_method)`. `src/worker.mjs#queue` calls `globalThis.HOMURA_QUEUE_DISPATCH` which forwards into `dispatch_js` below.

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.handlersObject (readonly)

Returns the value of attribute handlers.



311
312
313
# File 'lib/cloudflare_workers/queue.rb', line 311

def handlers
  @handlers
end

Class Method Details

.dispatch_js(js_batch, js_env, js_ctx) ⇒ Object

Dispatcher called from the JS hook. ‘js_batch` is the MessageBatch, `js_env`, `js_ctx` are the Workers env and ExecutionContext. Returns a Hash summary ({ queue:, handled:, size: }) for diagnostics.



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/cloudflare_workers/queue.rb', line 330

def self.dispatch_js(js_batch, js_env, js_ctx)
  batch = QueueBatch.new(js_batch)
  queue_name = batch.queue
  handler = handler_for(queue_name)
  if handler.nil?
    warn "[Cloudflare::QueueConsumer] no handler registered for queue #{queue_name.inspect}; messages will time out and retry"
    return { 'queue' => queue_name, 'handled' => false, 'size' => batch.size, 'reason' => 'no_handler' }
  end

  ctx = QueueContext.new(batch, js_env, js_ctx)
  result = handler.bind(ctx).call(batch)
  if `(#{result} != null && typeof #{result}.then === 'function')`
    result = result.__await__
  end
  { 'queue' => queue_name, 'handled' => true, 'size' => batch.size, 'result' => result.is_a?(Hash) ? result : nil }
end

.handler_for(queue_name) ⇒ Object



319
320
321
# File 'lib/cloudflare_workers/queue.rb', line 319

def self.handler_for(queue_name)
  (@handlers || {})[queue_name.to_s]
end

.handlers_by_queueObject



323
324
325
# File 'lib/cloudflare_workers/queue.rb', line 323

def self.handlers_by_queue
  (@handlers || {}).dup
end

.install_dispatcherObject

Single-line backtick (see scheduled.rb for the Opal multi-line constraint). Logs+swallows thrown errors so one bad handler doesn’t crash the Workers queue consumer.



350
351
352
353
# File 'lib/cloudflare_workers/queue.rb', line 350

def self.install_dispatcher
  mod = self
  `globalThis.__HOMURA_QUEUE_DISPATCH__ = async function(js_batch, js_env, js_ctx) { try { return await #{mod}.$dispatch_js(js_batch, js_env, js_ctx); } catch (err) { try { globalThis.console.error('[Cloudflare::QueueConsumer] dispatch failed:', err && err.stack || err); } catch (e) {} return { queue: (js_batch && js_batch.queue) || '', handled: false, size: (js_batch && Array.isArray(js_batch.messages) ? js_batch.messages.length : 0), error: String(err && err.message || err) }; } };(function(){var g=globalThis;g.__OPAL_WORKERS__=g.__OPAL_WORKERS__||{};g.__OPAL_WORKERS__.queue=g.__HOMURA_QUEUE_DISPATCH__;})();`
end

.register(queue_name, unbound_method) ⇒ Object



314
315
316
317
# File 'lib/cloudflare_workers/queue.rb', line 314

def self.register(queue_name, unbound_method)
  @handlers ||= {}
  @handlers[queue_name.to_s] = unbound_method
end