Module: Cloudflare::QueueConsumer

Defined in:
lib/homura/runtime/queue.rb

Overview

Consumer dispatcher. The Sinatra DSL (‘consume_queue`) registers handlers here via `Cloudflare::QueueConsumer.register(queue_name, unbound_method)`. `src/worker.mjs#queue` calls `globalThis.HOMURA_QUEUE_DISPATCH` which forwards into `dispatch_js` below.

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.handlersObject (readonly)

Returns the value of attribute handlers.



352
353
354
# File 'lib/homura/runtime/queue.rb', line 352

def handlers
  @handlers
end

Class Method Details

.dispatch_js(js_batch, js_env, js_ctx) ⇒ Object

Dispatcher called from the JS hook. ‘js_batch` is the MessageBatch, `js_env`, `js_ctx` are the Workers env and ExecutionContext. Returns a Hash summary ({ queue:, handled:, size: }) for diagnostics.



371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
# File 'lib/homura/runtime/queue.rb', line 371

def self.dispatch_js(js_batch, js_env, js_ctx)
  batch = QueueBatch.new(js_batch)
  queue_name = batch.queue
  handler = handler_for(queue_name)
  if handler.nil?
    warn(
      "[Cloudflare::QueueConsumer] no handler registered for queue #{queue_name.inspect}; messages will time out and retry"
    )
    return ({
      "queue" => queue_name,
      "handled" => false,
      "size" => batch.size,
      "reason" => "no_handler"
    })
  end

  ctx = QueueContext.new(batch, js_env, js_ctx)
  result = handler.bind(ctx).call(batch)
  if `(#{result} != null && typeof #{result}.then === 'function')`
    result = result.__await__
  end

  {
    "queue" => queue_name,
    "handled" => true,
    "size" => batch.size,
    "result" => result.is_a?(Hash) ? result : nil
  }
end

.handler_for(queue_name) ⇒ Object



360
361
362
# File 'lib/homura/runtime/queue.rb', line 360

def self.handler_for(queue_name)
  (@handlers || {})[queue_name.to_s]
end

.handlers_by_queueObject



364
365
366
# File 'lib/homura/runtime/queue.rb', line 364

def self.handlers_by_queue
  (@handlers || {}).dup
end

.install_dispatcherObject

Single-line backtick (see scheduled.rb for the Opal multi-line constraint). Logs+swallows thrown errors so one bad handler doesn’t crash the Workers queue consumer.



404
405
406
407
# File 'lib/homura/runtime/queue.rb', line 404

def self.install_dispatcher
  mod = self
  `globalThis.__HOMURA_QUEUE_DISPATCH__ = async function(js_batch, js_env, js_ctx) { try { return await #{mod}.$dispatch_js(js_batch, js_env, js_ctx); } catch (err) { try { globalThis.console.error('[Cloudflare::QueueConsumer] dispatch failed:', err && err.stack || err); } catch (e) {} return { queue: (js_batch && js_batch.queue) || '', handled: false, size: (js_batch && Array.isArray(js_batch.messages) ? js_batch.messages.length : 0), error: String(err && err.message || err) }; } };(function(){var g=globalThis;g.__OPAL_WORKERS__=g.__OPAL_WORKERS__||{};g.__OPAL_WORKERS__.queue=g.__HOMURA_QUEUE_DISPATCH__;})();`
end

.register(queue_name, unbound_method) ⇒ Object



355
356
357
358
# File 'lib/homura/runtime/queue.rb', line 355

def self.register(queue_name, unbound_method)
  @handlers ||= {}
  @handlers[queue_name.to_s] = unbound_method
end