Module: Cloudflare::QueueConsumer

Defined in:
lib/homura/runtime/queue.rb

Overview

Consumer dispatcher. The Sinatra DSL (‘consume_queue`) registers handlers here via `Cloudflare::QueueConsumer.register(queue_name, unbound_method)`. `src/worker.mjs#queue` calls `globalThis.HOMURA_QUEUE_DISPATCH` which forwards into `dispatch_js` below.

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.handlersObject (readonly)

Returns the value of attribute handlers.



332
333
334
# File 'lib/homura/runtime/queue.rb', line 332

def handlers
  @handlers
end

Class Method Details

.dispatch_js(js_batch, js_env, js_ctx) ⇒ Object

Dispatcher called from the JS hook. ‘js_batch` is the MessageBatch, `js_env`, `js_ctx` are the Workers env and ExecutionContext. Returns a Hash summary ({ queue:, handled:, size: }) for diagnostics.



351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/homura/runtime/queue.rb', line 351

def self.dispatch_js(js_batch, js_env, js_ctx)
  batch = QueueBatch.new(js_batch)
  queue_name = batch.queue
  handler = handler_for(queue_name)
  if handler.nil?
    warn "[Cloudflare::QueueConsumer] no handler registered for queue #{queue_name.inspect}; messages will time out and retry"
    return(
      {
        "queue" => queue_name,
        "handled" => false,
        "size" => batch.size,
        "reason" => "no_handler"
      }
    )
  end

  ctx = QueueContext.new(batch, js_env, js_ctx)
  result = handler.bind(ctx).call(batch)
  if `(#{result} != null && typeof #{result}.then === 'function')`
    result = result.__await__
  end
  {
    "queue" => queue_name,
    "handled" => true,
    "size" => batch.size,
    "result" => result.is_a?(Hash) ? result : nil
  }
end

.handler_for(queue_name) ⇒ Object



340
341
342
# File 'lib/homura/runtime/queue.rb', line 340

def self.handler_for(queue_name)
  (@handlers || {})[queue_name.to_s]
end

.handlers_by_queueObject



344
345
346
# File 'lib/homura/runtime/queue.rb', line 344

def self.handlers_by_queue
  (@handlers || {}).dup
end

.install_dispatcherObject

Single-line backtick (see scheduled.rb for the Opal multi-line constraint). Logs+swallows thrown errors so one bad handler doesn’t crash the Workers queue consumer.



383
384
385
386
# File 'lib/homura/runtime/queue.rb', line 383

def self.install_dispatcher
  mod = self
  `globalThis.__HOMURA_QUEUE_DISPATCH__ = async function(js_batch, js_env, js_ctx) { try { return await #{mod}.$dispatch_js(js_batch, js_env, js_ctx); } catch (err) { try { globalThis.console.error('[Cloudflare::QueueConsumer] dispatch failed:', err && err.stack || err); } catch (e) {} return { queue: (js_batch && js_batch.queue) || '', handled: false, size: (js_batch && Array.isArray(js_batch.messages) ? js_batch.messages.length : 0), error: String(err && err.message || err) }; } };(function(){var g=globalThis;g.__OPAL_WORKERS__=g.__OPAL_WORKERS__||{};g.__OPAL_WORKERS__.queue=g.__HOMURA_QUEUE_DISPATCH__;})();`
end

.register(queue_name, unbound_method) ⇒ Object



335
336
337
338
# File 'lib/homura/runtime/queue.rb', line 335

def self.register(queue_name, unbound_method)
  @handlers ||= {}
  @handlers[queue_name.to_s] = unbound_method
end