Class: BugBunny::Consumer
- Inherits:
-
Object
- Object
- BugBunny::Consumer
- Includes:
- Observability
- Defined in:
- lib/bug_bunny/consumer.rb
Overview
Consumidor de mensajes AMQP que actúa como un Router RESTful.
Esta clase es el corazón del procesamiento de mensajes en el lado del servidor/worker. Sus responsabilidades son:
-
Escuchar una cola específica.
-
Deserializar el mensaje y sus headers.
-
Consultar el mapa global ‘BugBunny.routes` para enrutar el mensaje a un Controlador.
-
Gestionar el ciclo de respuesta RPC (Request-Response) para evitar timeouts en el cliente.
Constant Summary
Constants included from Observability
Instance Attribute Summary collapse
-
#session ⇒ BugBunny::Session
readonly
La sesión wrapper de RabbitMQ que gestiona el canal.
Class Method Summary collapse
-
.subscribe(connection:, **args) ⇒ BugBunny::Consumer
Método de conveniencia para instanciar y suscribir en un solo paso.
Instance Method Summary collapse
-
#initialize(connection) ⇒ Consumer
constructor
Inicializa un nuevo consumidor.
-
#shutdown ⇒ void
Detiene el health check timer y cierra el canal de forma ordenada.
-
#subscribe(queue_name:, exchange_name:, routing_key:, exchange_type: 'direct', exchange_opts: {}, queue_opts: {}, block: true) ⇒ void
Inicia la suscripción a la cola y comienza el bucle de procesamiento.
Methods included from Observability
Constructor Details
Instance Attribute Details
#session ⇒ BugBunny::Session (readonly)
Returns La sesión wrapper de RabbitMQ que gestiona el canal.
33 34 35 |
# File 'lib/bug_bunny/consumer.rb', line 33 def session @session end |
Class Method Details
.subscribe(connection:, **args) ⇒ BugBunny::Consumer
Método de conveniencia para instanciar y suscribir en un solo paso.
40 41 42 |
# File 'lib/bug_bunny/consumer.rb', line 40 def self.subscribe(connection:, **args) new(connection).subscribe(**args) end |
Instance Method Details
#shutdown ⇒ void
This method returns an undefined value.
Detiene el health check timer y cierra el canal de forma ordenada.
Llamar explícitamente al hacer shutdown del worker (SIGTERM, at_exit, etc.). También se invoca automáticamente cuando ‘subscribe` termina por cualquier motivo.
135 136 137 138 139 140 |
# File 'lib/bug_bunny/consumer.rb', line 135 def shutdown safe_log(:info, 'consumer.shutdown') @health_timer&.shutdown @health_timer = nil session.close end |
#subscribe(queue_name:, exchange_name:, routing_key:, exchange_type: 'direct', exchange_opts: {}, queue_opts: {}, block: true) ⇒ void
This method returns an undefined value.
Inicia la suscripción a la cola y comienza el bucle de procesamiento.
Declara el exchange y la cola (si no existen), realiza el “binding” y se queda escuchando mensajes entrantes.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/bug_bunny/consumer.rb', line 66 def subscribe(queue_name:, exchange_name:, routing_key:, exchange_type: 'direct', exchange_opts: {}, queue_opts: {}, block: true) attempt = 0 begin # Declaración de Infraestructura x = session.exchange(name: exchange_name, type: exchange_type, opts: exchange_opts) q = session.queue(queue_name, queue_opts) q.bind(x, routing_key: routing_key) # 📊 LOGGING DE OBSERVABILIDAD: Calculamos las opciones finales para mostrarlas en consola final_x_opts = BugBunny::Session::DEFAULT_EXCHANGE_OPTIONS .merge(BugBunny.configuration. || {}) .merge(exchange_opts || {}) final_q_opts = BugBunny::Session::DEFAULT_QUEUE_OPTIONS .merge(BugBunny.configuration. || {}) .merge(queue_opts || {}) safe_log(:info, 'consumer.start', queue: queue_name, queue_opts: final_q_opts) safe_log(:info, 'consumer.bound', exchange: exchange_name, exchange_type: exchange_type, routing_key: routing_key, exchange_opts: final_x_opts) start_health_check(queue_name) q.subscribe(manual_ack: true, block: block) do |delivery_info, properties, body| trace_id = properties.correlation_id logger = BugBunny.configuration.logger core = lambda { if logger.respond_to?(:tagged) logger.tagged(trace_id) { (delivery_info, properties, body) } elsif defined?(Rails) && Rails.logger.respond_to?(:tagged) Rails.logger.tagged(trace_id) { (delivery_info, properties, body) } else (delivery_info, properties, body) end } BugBunny.configuration.consumer_middlewares.call(delivery_info, properties, body, &core) end rescue StandardError => e attempt += 1 max_attempts = BugBunny.configuration.max_reconnect_attempts if max_attempts && attempt >= max_attempts safe_log(:error, 'consumer.reconnect_exhausted', max_attempts_count: max_attempts, **(e)) raise end wait = [ BugBunny.configuration.network_recovery_interval * (2**(attempt - 1)), BugBunny.configuration.max_reconnect_interval ].min safe_log(:error, 'consumer.connection_error', attempt_count: attempt, max_attempts_count: max_attempts || 'infinity', retry_in_s: wait, **(e)) sleep wait retry end ensure shutdown end |