Class: BugBunny::Session Private

Inherits:
Object
  • Object
show all
Includes:
Observability
Defined in:
lib/bug_bunny/session.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Clase interna que encapsula una unidad de trabajo sobre una conexión RabbitMQ.

Implementa la lógica de “Configuración en Cascada” para Exchanges y Colas, gestionando el ciclo de vida de un ‘Bunny::Channel` con resiliencia y carga perezosa.

Opciones por Defecto (Nivel 1: Gema) collapse

DEFAULT_EXCHANGE_OPTIONS =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Opciones predeterminadas de la gema para Exchanges.

{ durable: false, auto_delete: false }.freeze
DEFAULT_QUEUE_OPTIONS =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Opciones predeterminadas de la gema para Colas.

‘durable: true, exclusive: false, auto_delete: false` es el patrón “queue compartida duradera” — sobrevive restart del broker, múltiples consumers (worker pool) pueden consumir, no se elimina cuando el último consumer se desconecta.

Histórico: hasta 4.15.x el default era ‘{ exclusive: false, durable: false, auto_delete: true }` (combo `transient_nonexcl_queues` deprecada en RabbitMQ 4.x). Ver issue #42 para detalles de la migración. Para restaurar el comportamiento anterior, configurar explícitamente:

BugBunny.configure do |c|
  c.queue_options = { exclusive: false, durable: false, auto_delete: true }
end
{ exclusive: false, durable: true, auto_delete: false }.freeze

Constants included from Observability

Observability::SENSITIVE_KEYS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Observability

sensitive_key?

Constructor Details

#initialize(connection, publisher_confirms: true) ⇒ Session

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Inicializa una nueva sesión sin abrir canales todavía.

Parameters:

  • connection (Bunny::Session)

    Una conexión (puede estar abierta o cerrada temporalmente).

  • publisher_confirms (Boolean) (defaults to: true)

    Si es ‘true`, el canal se abre en modo Publisher Confirms. Activar solo en sesiones de Producer. En sesiones de Consumer genera overhead innecesario ya que los replies RPC son fire-and-forget desde la perspectiva del servidor.



47
48
49
50
51
52
53
54
55
# File 'lib/bug_bunny/session.rb', line 47

def initialize(connection, publisher_confirms: true)
  @connection = connection
  @publisher_confirms = publisher_confirms
  @channel = nil
  @channel_mutex = Mutex.new
  @logger = BugBunny.configuration.logger
  @configured_returns = {}
  @pending_returns = Concurrent::Map.new
end

Instance Attribute Details

#connectionBunny::Session (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns La conexión TCP subyacente.

Returns:

  • (Bunny::Session)

    La conexión TCP subyacente.



39
40
41
# File 'lib/bug_bunny/session.rb', line 39

def connection
  @connection
end

Instance Method Details

#channelBunny::Channel

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Obtiene el canal actual o crea uno nuevo si es necesario.

Este método es el punto central de la robustez. Verifica la salud de la conexión y del canal antes de devolverlo.

Returns:

  • (Bunny::Channel)

    Un canal abierto y configurado.

Raises:



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/bug_bunny/session.rb', line 92

def channel
  # Fast path: canal abierto, sin adquirir el mutex.
  return @channel if @channel&.open?

  # Slow path: adquirimos el mutex y verificamos de nuevo (double-checked locking).
  # Evita que múltiples threads creen canales simultáneamente cuando el canal cae.
  @channel_mutex.synchronize do
    return @channel if @channel&.open?

    ensure_connection!
    create_channel!
  end

  @channel
end

#closevoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Cierra el canal asociado a esta sesión de forma segura.



154
155
156
157
158
159
160
161
# File 'lib/bug_bunny/session.rb', line 154

def close
  @channel_mutex.synchronize do
    @channel&.close if @channel&.open?
    @channel = nil
    @configured_returns.clear
    release_pending_returns!
  end
end

#exchange(name: nil, type: 'direct', opts: {}) ⇒ Bunny::Exchange

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Factory method para declarar o recuperar un Exchange aplicando la cascada de configuración.

Jerarquía de fusión:

  1. Defaults de la gema (‘DEFAULT_EXCHANGE_OPTIONS`)

  2. Configuración global (‘BugBunny.configuration.exchange_options`)

  3. Opciones específicas pasadas como argumento (‘opts`)

Parameters:

  • name (String, nil) (defaults to: nil)

    Nombre del exchange.

  • type (String, Symbol) (defaults to: 'direct')

    Tipo de exchange (‘direct’, ‘topic’, ‘fanout’).

  • opts (Hash) (defaults to: {})

    Opciones específicas de infraestructura para este intercambio.

Returns:

  • (Bunny::Exchange)

    El objeto exchange de Bunny configurado.



119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/bug_bunny/session.rb', line 119

def exchange(name: nil, type: 'direct', opts: {})
  return channel.default_exchange if name.nil? || name.empty?

  # Aplicación de la lógica de fusión en cascada
  merged_opts = DEFAULT_EXCHANGE_OPTIONS
                .merge(BugBunny.configuration.exchange_options || {})
                .merge(opts)

  # public_send permite llamar a :topic, :direct, etc. dinámicamente según el tipo
  x = channel.public_send(type.to_s, name.to_s, merged_opts)
  register_on_return!(x) if @publisher_confirms
  x
end

#queue(name, opts = {}) ⇒ Bunny::Queue

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Factory method para declarar o recuperar una Cola aplicando la cascada de configuración.

Jerarquía de fusión:

  1. Defaults de la gema (‘DEFAULT_QUEUE_OPTIONS`)

  2. Configuración global (‘BugBunny.configuration.queue_options`)

  3. Opciones específicas pasadas como argumento (‘opts`)

Parameters:

  • name (String)

    Nombre de la cola.

  • opts (Hash) (defaults to: {})

    Opciones específicas de infraestructura para esta cola.

Returns:

  • (Bunny::Queue)

    El objeto cola de Bunny configurado.



143
144
145
146
147
148
149
150
# File 'lib/bug_bunny/session.rb', line 143

def queue(name, opts = {})
  # Aplicación de la lógica de fusión en cascada
  merged_opts = DEFAULT_QUEUE_OPTIONS
                .merge(BugBunny.configuration.queue_options || {})
                .merge(opts)

  channel.queue(name.to_s, merged_opts)
end

#register_return_listener(cid) ⇒ Array(Concurrent::Event, Hash)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registra interés en una eventual señal ‘basic.return` correlacionada con `cid`.

Devuelve un par ‘(event, slot)`. El caller espera el `event` tras `wait_for_confirms`; si el broker retorna el mensaje, #handle_broker_return setea el event y deposita la `return_info` en `slot` antes de invocar el callback global del usuario.

El caller es responsable de invocar #unregister_return_listener en un ‘ensure` para evitar fugas en el registry interno.

Parameters:

  • cid (String)

    Correlation ID del request en curso.

Returns:

  • (Array(Concurrent::Event, Hash))

    Tupla ‘[event, slot]`. `slot` será poblado con la `Bunny::ReturnInfo` si el broker retorna el mensaje.



70
71
72
73
74
# File 'lib/bug_bunny/session.rb', line 70

def register_return_listener(cid)
  slot = { event: Concurrent::Event.new, info: nil }
  @pending_returns[cid.to_s] = slot
  [slot[:event], slot]
end

#unregister_return_listener(cid) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Elimina el listener registrado por #register_return_listener.

Parameters:

  • cid (String)


81
82
83
# File 'lib/bug_bunny/session.rb', line 81

def unregister_return_listener(cid)
  @pending_returns.delete(cid.to_s)
end