Class: BugBunny::Producer

Inherits:
Object
  • Object
show all
Includes:
Observability
Defined in:
lib/bug_bunny/producer.rb

Overview

Clase de bajo nivel encargada de la publicación de mensajes en RabbitMQ.

Actúa como el “motor” de envío del framework. Es responsable de:

  1. Serializar el payload del mensaje.

  2. Manejar la publicación asíncrona (Fire-and-Forget).

  3. Implementar el patrón RPC síncrono utilizando futuros (‘Concurrent::IVar`).

  4. Gestionar la escucha de respuestas en la cola especial de RabbitMQ.

Constant Summary

Constants included from Observability

Observability::SENSITIVE_KEYS

Instance Method Summary collapse

Methods included from Observability

sensitive_key?

Constructor Details

#initialize(session) ⇒ Producer

Inicializa el productor.

Prepara las estructuras de concurrencia necesarias para manejar múltiples peticiones RPC simultáneas sobre la misma conexión.

Parameters:



24
25
26
27
28
29
30
31
# File 'lib/bug_bunny/producer.rb', line 24

def initialize(session)
  @session = session
  @logger = BugBunny.configuration.logger
  # Mapa thread-safe para correlacionar IDs de petición con sus futuros (IVars)
  @pending_requests = Concurrent::Map.new
  @reply_listener_mutex = Mutex.new
  @reply_listener_started = false
end

Instance Method Details

#confirmed(request) ⇒ Hash

Envía un mensaje con Publisher Confirms síncronos (Fire-and-Forget confirmado).

Publica el mensaje y bloquea el hilo actual hasta que el broker confirme su recepción vía ‘wait_for_confirms`. Soporta `mandatory: true` con callback `on_return` para mensajes que no pudieron rutearse.

A diferencia de #rpc (que espera la respuesta de un Consumer remoto), aquí solo se espera el ACK del propio broker — no hay round-trip al servicio destino.

Parameters:

  • request (BugBunny::Request)

    Request con ‘mandatory`, `confirm_timeout`, `nack_raise` y/o `on_return` opcionales.

Returns:

  • (Hash)

    ‘{ ’status’ => 202, ‘body’ => nil }‘ si el broker confirmó la recepción.

Raises:



62
63
64
65
66
67
68
69
70
71
# File 'lib/bug_bunny/producer.rb', line 62

def confirmed(request)
  publish_message(request)
  acked = wait_for_confirms!(request)
  handle_confirm_result(request, acked)
  { 'status' => 202, 'body' => nil }
rescue BugBunny::Error
  raise
rescue StandardError => e
  raise BugBunny::CommunicationError, "Publisher confirms failed: #{e.message}"
end

#fire(request) ⇒ Hash

Envía un mensaje de forma asíncrona (Fire-and-Forget).

Serializa el cuerpo del request, resuelve el exchange aplicando la cascada de configuración y publica el mensaje sin esperar respuesta.

Parameters:

  • request (BugBunny::Request)

    Objeto con la configuración del envío (body, exchange_options, etc).

Returns:

  • (Hash)

    Un hash de éxito simbólico ({ ‘status’ => 202 }).



40
41
42
43
44
# File 'lib/bug_bunny/producer.rb', line 40

def fire(request)
  publish_message(request)
  # Devolvemos un hash para evitar NoMethodError en el cliente (que espera una respuesta tipo Hash)
  { 'status' => 202, 'body' => nil }
end

#rpc(request) ⇒ Hash

Envía un mensaje y bloquea el hilo actual esperando una respuesta (RPC).

Implementa el mecanismo “Direct Reply-to” de RabbitMQ (‘amq.rabbitmq.reply-to`).

Parameters:

Returns:

  • (Hash)

    El cuerpo de la respuesta parseado desde JSON.

Raises:



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/bug_bunny/producer.rb', line 81

def rpc(request)
  ensure_reply_listener!

  request.correlation_id ||= SecureRandom.uuid
  request.reply_to = 'amq.rabbitmq.reply-to'
  wait_timeout = request.timeout || BugBunny.configuration.rpc_timeout
  cid = request.correlation_id.to_s

  # Creamos un futuro (IVar) que actuará como semáforo
  future = Concurrent::IVar.new
  @pending_requests[cid] = future

  begin
    fire(request)

    safe_log(:debug, 'producer.rpc_waiting', messaging_message_id: cid, timeout_s: wait_timeout)

    # Bloqueamos el hilo aquí hasta que llegue la respuesta o expire el timeout
    result = future.value(wait_timeout)

    raise BugBunny::RequestTimeout, "Timeout waiting for RPC: #{request.path} [#{request.method}]" if result.nil?

    BugBunny.configuration.on_rpc_reply&.call(result[:headers])

    safe_log(:debug, 'producer.rpc_response_received',
             messaging_system: 'rabbitmq', messaging_operation: 'receive', messaging_message_id: cid,
             response_body: result[:body]&.truncate(500),
             response_headers: result[:headers]&.to_json&.truncate(300))

    parse_response(result[:body])
  ensure
    # Limpieza vital para evitar fugas de memoria en el mapa
    @pending_requests.delete(cid)
  end
end