Class: BugBunny::Session Private
- Inherits:
-
Object
- Object
- BugBunny::Session
- Includes:
- Observability
- Defined in:
- lib/bug_bunny/session.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Clase interna que encapsula una unidad de trabajo sobre una conexión RabbitMQ.
Implementa la lógica de “Configuración en Cascada” para Exchanges y Colas, gestionando el ciclo de vida de un ‘Bunny::Channel` con resiliencia y carga perezosa.
Opciones por Defecto (Nivel 1: Gema) collapse
- DEFAULT_EXCHANGE_OPTIONS =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Opciones predeterminadas de la gema para Exchanges.
{ durable: false, auto_delete: false }.freeze
- DEFAULT_QUEUE_OPTIONS =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Opciones predeterminadas de la gema para Colas.
‘durable: true, exclusive: false, auto_delete: false` es el patrón “queue compartida duradera” — sobrevive restart del broker, múltiples consumers (worker pool) pueden consumir, no se elimina cuando el último consumer se desconecta.
Histórico: hasta 4.15.x el default era ‘{ exclusive: false, durable: false, auto_delete: true }` (combo `transient_nonexcl_queues` deprecada en RabbitMQ 4.x). Ver issue #42 para detalles de la migración. Para restaurar el comportamiento anterior, configurar explícitamente:
BugBunny.configure do |c| c. = { exclusive: false, durable: false, auto_delete: true } end { exclusive: false, durable: true, auto_delete: false }.freeze
Constants included from Observability
Instance Attribute Summary collapse
-
#connection ⇒ Bunny::Session
readonly
private
La conexión TCP subyacente.
Instance Method Summary collapse
-
#channel ⇒ Bunny::Channel
private
Obtiene el canal actual o crea uno nuevo si es necesario.
-
#close ⇒ void
private
Cierra el canal asociado a esta sesión de forma segura.
-
#exchange(name: nil, type: 'direct', opts: {}) ⇒ Bunny::Exchange
private
Factory method para declarar o recuperar un Exchange aplicando la cascada de configuración.
-
#initialize(connection, publisher_confirms: true) ⇒ Session
constructor
private
Inicializa una nueva sesión sin abrir canales todavía.
-
#queue(name, opts = {}) ⇒ Bunny::Queue
private
Factory method para declarar o recuperar una Cola aplicando la cascada de configuración.
-
#register_return_listener(cid) ⇒ Array(Concurrent::Event, Hash)
private
Registra interés en una eventual señal ‘basic.return` correlacionada con `cid`.
-
#unregister_return_listener(cid) ⇒ void
private
Elimina el listener registrado por #register_return_listener.
Methods included from Observability
Constructor Details
#initialize(connection, publisher_confirms: true) ⇒ Session
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Inicializa una nueva sesión sin abrir canales todavía.
47 48 49 50 51 52 53 54 55 |
# File 'lib/bug_bunny/session.rb', line 47 def initialize(connection, publisher_confirms: true) @connection = connection @publisher_confirms = publisher_confirms @channel = nil @channel_mutex = Mutex.new @logger = BugBunny.configuration.logger @configured_returns = {} @pending_returns = Concurrent::Map.new end |
Instance Attribute Details
#connection ⇒ Bunny::Session (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns La conexión TCP subyacente.
39 40 41 |
# File 'lib/bug_bunny/session.rb', line 39 def connection @connection end |
Instance Method Details
#channel ⇒ Bunny::Channel
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Obtiene el canal actual o crea uno nuevo si es necesario.
Este método es el punto central de la robustez. Verifica la salud de la conexión y del canal antes de devolverlo.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/bug_bunny/session.rb', line 92 def channel # Fast path: canal abierto, sin adquirir el mutex. return @channel if @channel&.open? # Slow path: adquirimos el mutex y verificamos de nuevo (double-checked locking). # Evita que múltiples threads creen canales simultáneamente cuando el canal cae. @channel_mutex.synchronize do return @channel if @channel&.open? ensure_connection! create_channel! end @channel end |
#close ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Cierra el canal asociado a esta sesión de forma segura.
154 155 156 157 158 159 160 161 |
# File 'lib/bug_bunny/session.rb', line 154 def close @channel_mutex.synchronize do @channel&.close if @channel&.open? @channel = nil @configured_returns.clear release_pending_returns! end end |
#exchange(name: nil, type: 'direct', opts: {}) ⇒ Bunny::Exchange
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Factory method para declarar o recuperar un Exchange aplicando la cascada de configuración.
Jerarquía de fusión:
-
Defaults de la gema (‘DEFAULT_EXCHANGE_OPTIONS`)
-
Configuración global (‘BugBunny.configuration.exchange_options`)
-
Opciones específicas pasadas como argumento (‘opts`)
119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/bug_bunny/session.rb', line 119 def exchange(name: nil, type: 'direct', opts: {}) return channel.default_exchange if name.nil? || name.empty? # Aplicación de la lógica de fusión en cascada merged_opts = DEFAULT_EXCHANGE_OPTIONS .merge(BugBunny.configuration. || {}) .merge(opts) # public_send permite llamar a :topic, :direct, etc. dinámicamente según el tipo x = channel.public_send(type.to_s, name.to_s, merged_opts) register_on_return!(x) if @publisher_confirms x end |
#queue(name, opts = {}) ⇒ Bunny::Queue
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Factory method para declarar o recuperar una Cola aplicando la cascada de configuración.
Jerarquía de fusión:
-
Defaults de la gema (‘DEFAULT_QUEUE_OPTIONS`)
-
Configuración global (‘BugBunny.configuration.queue_options`)
-
Opciones específicas pasadas como argumento (‘opts`)
143 144 145 146 147 148 149 150 |
# File 'lib/bug_bunny/session.rb', line 143 def queue(name, opts = {}) # Aplicación de la lógica de fusión en cascada merged_opts = DEFAULT_QUEUE_OPTIONS .merge(BugBunny.configuration. || {}) .merge(opts) channel.queue(name.to_s, merged_opts) end |
#register_return_listener(cid) ⇒ Array(Concurrent::Event, Hash)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registra interés en una eventual señal ‘basic.return` correlacionada con `cid`.
Devuelve un par ‘(event, slot)`. El caller espera el `event` tras `wait_for_confirms`; si el broker retorna el mensaje, #handle_broker_return setea el event y deposita la `return_info` en `slot` antes de invocar el callback global del usuario.
El caller es responsable de invocar #unregister_return_listener en un ‘ensure` para evitar fugas en el registry interno.
70 71 72 73 74 |
# File 'lib/bug_bunny/session.rb', line 70 def register_return_listener(cid) slot = { event: Concurrent::Event.new, info: nil } @pending_returns[cid.to_s] = slot [slot[:event], slot] end |
#unregister_return_listener(cid) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Elimina el listener registrado por #register_return_listener.
81 82 83 |
# File 'lib/bug_bunny/session.rb', line 81 def unregister_return_listener(cid) @pending_returns.delete(cid.to_s) end |