Class: BugBunny::Producer

Inherits:
Object
  • Object
show all
Includes:
Observability
Defined in:
lib/bug_bunny/producer.rb

Overview

Clase de bajo nivel encargada de la publicación de mensajes en RabbitMQ.

Actúa como el “motor” de envío del framework. Es responsable de:

  1. Serializar el payload del mensaje.

  2. Manejar la publicación asíncrona (Fire-and-Forget).

  3. Implementar el patrón RPC síncrono utilizando futuros (‘Concurrent::IVar`).

  4. Gestionar la escucha de respuestas en la cola especial de RabbitMQ.

Constant Summary collapse

RETURN_RACE_WINDOW_S =

Bound de espera (segundos) que el publish thread aplica tras un ack positivo para tolerar el scheduling race entre reader thread (donde Bunny invoca ‘on_return`) y publish thread (donde se devuelve `wait_for_confirms`). AMQP garantiza que el `basic.return` precede al `basic.ack` en la wire, así que en la práctica el event ya está seteado al llegar acá; este wait es defensa contra GVL.

0.05

Constants included from Observability

Observability::SENSITIVE_KEYS

Instance Method Summary collapse

Methods included from Observability

sensitive_key?

Constructor Details

#initialize(session) ⇒ Producer

Inicializa el productor.

Prepara las estructuras de concurrencia necesarias para manejar múltiples peticiones RPC simultáneas sobre la misma conexión.

Parameters:



31
32
33
34
35
36
37
38
# File 'lib/bug_bunny/producer.rb', line 31

def initialize(session)
  @session = session
  @logger = BugBunny.configuration.logger
  # Mapa thread-safe para correlacionar IDs de petición con sus futuros (IVars)
  @pending_requests = Concurrent::Map.new
  @reply_listener_mutex = Mutex.new
  @reply_listener_started = false
end

Instance Method Details

#confirmed(request) ⇒ Hash

Envía un mensaje con Publisher Confirms síncronos (Fire-and-Forget confirmado).

Publica el mensaje y bloquea el hilo actual hasta que el broker confirme su recepción vía ‘wait_for_confirms`. Soporta `mandatory: true` con callback `on_return` para mensajes que no pudieron rutearse.

A diferencia de #rpc (que espera la respuesta de un Consumer remoto), aquí solo se espera el ACK del propio broker — no hay round-trip al servicio destino.

Parameters:

  • request (BugBunny::Request)

    Request con ‘mandatory`, `confirm_timeout`, `nack_raise` y/o `on_return` opcionales.

Returns:

  • (Hash)

    ‘{ ’status’ => 202, ‘body’ => nil }‘ si el broker confirmó la recepción.

Raises:



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/bug_bunny/producer.rb', line 72

def confirmed(request)
  return_listener = nil
  return_listener = setup_return_listener(request)
  started_at = monotonic_now
  publish_duration = publish_message(request)

  confirm_started_at = monotonic_now
  acked = wait_for_confirms!(request)
  confirm_duration = duration_s(confirm_started_at)

  handle_confirm_result(request, acked)
  handle_return_result(request, return_listener)
  log_confirmed(request, publish_duration, confirm_duration, started_at)

  { 'status' => 202, 'body' => nil }
rescue BugBunny::Error
  raise
rescue StandardError => e
  raise BugBunny::CommunicationError, "Publisher confirms failed: #{e.message}"
ensure
  teardown_return_listener(request, return_listener)
end

#fire(request) ⇒ Hash

Envía un mensaje de forma asíncrona (Fire-and-Forget).

Serializa el cuerpo del request, resuelve el exchange aplicando la cascada de configuración y publica el mensaje sin esperar respuesta.

Parameters:

  • request (BugBunny::Request)

    Objeto con la configuración del envío (body, exchange_options, etc).

Returns:

  • (Hash)

    Un hash de éxito simbólico ({ ‘status’ => 202 }).



47
48
49
50
51
# File 'lib/bug_bunny/producer.rb', line 47

def fire(request)
  publish_message(request)
  # Devolvemos un hash para evitar NoMethodError en el cliente (que espera una respuesta tipo Hash)
  { 'status' => 202, 'body' => nil }
end

#rpc(request) ⇒ Hash

Envía un mensaje y bloquea el hilo actual esperando una respuesta (RPC).

Implementa el mecanismo “Direct Reply-to” de RabbitMQ (‘amq.rabbitmq.reply-to`).

Parameters:

Returns:

  • (Hash)

    El cuerpo de la respuesta parseado desde JSON.

Raises:



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/bug_bunny/producer.rb', line 103

def rpc(request)
  ensure_reply_listener!

  request.correlation_id ||= SecureRandom.uuid
  request.reply_to = 'amq.rabbitmq.reply-to'
  wait_timeout = request.timeout || BugBunny.configuration.rpc_timeout
  cid = request.correlation_id.to_s

  # Creamos un futuro (IVar) que actuará como semáforo
  future = Concurrent::IVar.new
  @pending_requests[cid] = future

  started_at = monotonic_now
  begin
    fire(request)

    safe_log(:debug, 'producer.rpc_waiting', messaging_message_id: cid, timeout_s: wait_timeout)

    # Bloqueamos el hilo aquí hasta que llegue la respuesta o expire el timeout
    result = future.value(wait_timeout)

    raise BugBunny::RequestTimeout, "Timeout waiting for RPC: #{request.path} [#{request.method}]" if result.nil?

    BugBunny.configuration.on_rpc_reply&.call(result[:headers])
    log_rpc_response_received(request, cid, result, started_at)

    parse_response(result[:body])
  ensure
    # Limpieza vital para evitar fugas de memoria en el mapa
    @pending_requests.delete(cid)
  end
end