Class: BugBunny::Consumer

Inherits:
Object
  • Object
show all
Includes:
Observability
Defined in:
lib/bug_bunny/consumer.rb

Overview

Consumidor de mensajes AMQP que actúa como un Router RESTful.

Esta clase es el corazón del procesamiento de mensajes en el lado del servidor/worker. Sus responsabilidades son:

  1. Escuchar una cola específica.

  2. Deserializar el mensaje y sus headers.

  3. Consultar el mapa global ‘BugBunny.routes` para enrutar el mensaje a un Controlador.

  4. Gestionar el ciclo de respuesta RPC (Request-Response) para evitar timeouts en el cliente.

Examples:

Suscripción manual

connection = BugBunny.create_connection
BugBunny::Consumer.subscribe(
  connection: connection,
  queue_name: 'my_app_queue',
  exchange_name: 'my_exchange',
  routing_key: 'users.#'
)

Constant Summary

Constants included from Observability

Observability::SENSITIVE_KEYS

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Observability

sensitive_key?

Constructor Details

#initialize(connection) ⇒ Consumer

Inicializa un nuevo consumidor.

Parameters:

  • connection (Bunny::Session)

    Conexión nativa de Bunny.



47
48
49
50
51
# File 'lib/bug_bunny/consumer.rb', line 47

def initialize(connection)
  @session = BugBunny::Session.new(connection, publisher_confirms: false)
  @health_timer = nil
  @logger = BugBunny.configuration.logger
end

Instance Attribute Details

#sessionBugBunny::Session (readonly)

Returns La sesión wrapper de RabbitMQ que gestiona el canal.

Returns:



33
34
35
# File 'lib/bug_bunny/consumer.rb', line 33

def session
  @session
end

Class Method Details

.subscribe(connection:, **args) ⇒ BugBunny::Consumer

Método de conveniencia para instanciar y suscribir en un solo paso.

Parameters:

  • connection (Bunny::Session)

    Una conexión TCP activa a RabbitMQ.

  • args (Hash)

    Argumentos que se pasarán al método #subscribe.

Returns:



40
41
42
# File 'lib/bug_bunny/consumer.rb', line 40

def self.subscribe(connection:, **args)
  new(connection).subscribe(**args)
end

Instance Method Details

#shutdownvoid

This method returns an undefined value.

Detiene el health check timer y cierra el canal de forma ordenada.

Llamar explícitamente al hacer shutdown del worker (SIGTERM, at_exit, etc.). También se invoca automáticamente cuando ‘subscribe` termina por cualquier motivo.



135
136
137
138
139
140
# File 'lib/bug_bunny/consumer.rb', line 135

def shutdown
  safe_log(:info, 'consumer.shutdown')
  @health_timer&.shutdown
  @health_timer = nil
  session.close
end

#subscribe(queue_name:, exchange_name:, routing_key:, exchange_type: 'direct', exchange_opts: {}, queue_opts: {}, block: true) ⇒ void

This method returns an undefined value.

Inicia la suscripción a la cola y comienza el bucle de procesamiento.

Declara el exchange y la cola (si no existen), realiza el “binding” y se queda escuchando mensajes entrantes.

Parameters:

  • queue_name (String)

    Nombre de la cola a escuchar.

  • exchange_name (String)

    Nombre del exchange al cual enlazar la cola.

  • routing_key (String)

    Patrón de enrutamiento (ej: ‘users.*’).

  • exchange_type (String) (defaults to: 'direct')

    Tipo de exchange (‘direct’, ‘topic’, ‘fanout’).

  • exchange_opts (Hash) (defaults to: {})

    Opciones adicionales para el exchange (durable, auto_delete).

  • queue_opts (Hash) (defaults to: {})

    Opciones adicionales para la cola (durable, auto_delete).

  • block (Boolean) (defaults to: true)

    Si es ‘true`, bloquea el hilo actual (loop infinito).



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/bug_bunny/consumer.rb', line 66

def subscribe(queue_name:, exchange_name:, routing_key:, exchange_type: 'direct', exchange_opts: {},
              queue_opts: {}, block: true)
  attempt = 0

  begin
    # Declaración de Infraestructura
    x = session.exchange(name: exchange_name, type: exchange_type, opts: exchange_opts)
    q = session.queue(queue_name, queue_opts)
    q.bind(x, routing_key: routing_key)

    # 📊 LOGGING DE OBSERVABILIDAD: Calculamos las opciones finales para mostrarlas en consola
    final_x_opts = BugBunny::Session::DEFAULT_EXCHANGE_OPTIONS
                   .merge(BugBunny.configuration.exchange_options || {})
                   .merge(exchange_opts || {})
    final_q_opts = BugBunny::Session::DEFAULT_QUEUE_OPTIONS
                   .merge(BugBunny.configuration.queue_options || {})
                   .merge(queue_opts || {})

    safe_log(:info, 'consumer.start', queue: queue_name, queue_opts: final_q_opts)
    safe_log(:info, 'consumer.bound', exchange: exchange_name, exchange_type: exchange_type,
                                      routing_key: routing_key, exchange_opts: final_x_opts)

    start_health_check(queue_name)

    q.subscribe(manual_ack: true, block: block) do |delivery_info, properties, body|
      trace_id = properties.correlation_id
      logger = BugBunny.configuration.logger

      core = lambda {
        if logger.respond_to?(:tagged)
          logger.tagged(trace_id) { process_message(delivery_info, properties, body) }
        elsif defined?(Rails) && Rails.logger.respond_to?(:tagged)
          Rails.logger.tagged(trace_id) { process_message(delivery_info, properties, body) }
        else
          process_message(delivery_info, properties, body)
        end
      }

      BugBunny.configuration.consumer_middlewares.call(delivery_info, properties, body, &core)
    end
  rescue StandardError => e
    attempt += 1
    max_attempts = BugBunny.configuration.max_reconnect_attempts

    if max_attempts && attempt >= max_attempts
      safe_log(:error, 'consumer.reconnect_exhausted', max_attempts_count: max_attempts, **(e))
      raise
    end

    wait = [
      BugBunny.configuration.network_recovery_interval * (2**(attempt - 1)),
      BugBunny.configuration.max_reconnect_interval
    ].min

    safe_log(:error, 'consumer.connection_error', attempt_count: attempt,
                                                  max_attempts_count: max_attempts || 'infinity', retry_in_s: wait, **(e))
    sleep wait
    retry
  end
ensure
  shutdown
end