Class: BugBunny::Producer
- Inherits:
-
Object
- Object
- BugBunny::Producer
- Includes:
- Observability
- Defined in:
- lib/bug_bunny/producer.rb
Overview
Clase de bajo nivel encargada de la publicación de mensajes en RabbitMQ.
Actúa como el “motor” de envío del framework. Es responsable de:
-
Serializar el payload del mensaje.
-
Manejar la publicación asíncrona (Fire-and-Forget).
-
Implementar el patrón RPC síncrono utilizando futuros (‘Concurrent::IVar`).
-
Gestionar la escucha de respuestas en la cola especial de RabbitMQ.
Constant Summary
Constants included from Observability
Instance Method Summary collapse
-
#confirmed(request) ⇒ Hash
Envía un mensaje con Publisher Confirms síncronos (Fire-and-Forget confirmado).
-
#fire(request) ⇒ Hash
Envía un mensaje de forma asíncrona (Fire-and-Forget).
-
#initialize(session) ⇒ Producer
constructor
Inicializa el productor.
-
#rpc(request) ⇒ Hash
Envía un mensaje y bloquea el hilo actual esperando una respuesta (RPC).
Methods included from Observability
Constructor Details
#initialize(session) ⇒ Producer
Inicializa el productor.
Prepara las estructuras de concurrencia necesarias para manejar múltiples peticiones RPC simultáneas sobre la misma conexión.
24 25 26 27 28 29 30 31 |
# File 'lib/bug_bunny/producer.rb', line 24 def initialize(session) @session = session @logger = BugBunny.configuration.logger # Mapa thread-safe para correlacionar IDs de petición con sus futuros (IVars) @pending_requests = Concurrent::Map.new @reply_listener_mutex = Mutex.new @reply_listener_started = false end |
Instance Method Details
#confirmed(request) ⇒ Hash
Envía un mensaje con Publisher Confirms síncronos (Fire-and-Forget confirmado).
Publica el mensaje y bloquea el hilo actual hasta que el broker confirme su recepción vía ‘wait_for_confirms`. Soporta `mandatory: true` con callback `on_return` para mensajes que no pudieron rutearse.
A diferencia de #rpc (que espera la respuesta de un Consumer remoto), aquí solo se espera el ACK del propio broker — no hay round-trip al servicio destino.
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/bug_bunny/producer.rb', line 62 def confirmed(request) (request) acked = wait_for_confirms!(request) handle_confirm_result(request, acked) { 'status' => 202, 'body' => nil } rescue BugBunny::Error raise rescue StandardError => e raise BugBunny::CommunicationError, "Publisher confirms failed: #{e.}" end |
#fire(request) ⇒ Hash
Envía un mensaje de forma asíncrona (Fire-and-Forget).
Serializa el cuerpo del request, resuelve el exchange aplicando la cascada de configuración y publica el mensaje sin esperar respuesta.
40 41 42 43 44 |
# File 'lib/bug_bunny/producer.rb', line 40 def fire(request) (request) # Devolvemos un hash para evitar NoMethodError en el cliente (que espera una respuesta tipo Hash) { 'status' => 202, 'body' => nil } end |
#rpc(request) ⇒ Hash
Envía un mensaje y bloquea el hilo actual esperando una respuesta (RPC).
Implementa el mecanismo “Direct Reply-to” de RabbitMQ (‘amq.rabbitmq.reply-to`).
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/bug_bunny/producer.rb', line 81 def rpc(request) ensure_reply_listener! request.correlation_id ||= SecureRandom.uuid request.reply_to = 'amq.rabbitmq.reply-to' wait_timeout = request.timeout || BugBunny.configuration.rpc_timeout cid = request.correlation_id.to_s # Creamos un futuro (IVar) que actuará como semáforo future = Concurrent::IVar.new @pending_requests[cid] = future begin fire(request) safe_log(:debug, 'producer.rpc_waiting', messaging_message_id: cid, timeout_s: wait_timeout) # Bloqueamos el hilo aquí hasta que llegue la respuesta o expire el timeout result = future.value(wait_timeout) raise BugBunny::RequestTimeout, "Timeout waiting for RPC: #{request.path} [#{request.method}]" if result.nil? BugBunny.configuration.on_rpc_reply&.call(result[:headers]) safe_log(:debug, 'producer.rpc_response_received', messaging_system: 'rabbitmq', messaging_operation: 'receive', messaging_message_id: cid, response_body: result[:body]&.truncate(500), response_headers: result[:headers]&.to_json&.truncate(300)) parse_response(result[:body]) ensure # Limpieza vital para evitar fugas de memoria en el mapa @pending_requests.delete(cid) end end |