Class: BugBunny::Producer
- Inherits:
-
Object
- Object
- BugBunny::Producer
- Includes:
- Observability
- Defined in:
- lib/bug_bunny/producer.rb
Overview
Clase de bajo nivel encargada de la publicación de mensajes en RabbitMQ.
Actúa como el “motor” de envío del framework. Es responsable de:
-
Serializar el payload del mensaje.
-
Manejar la publicación asíncrona (Fire-and-Forget).
-
Implementar el patrón RPC síncrono utilizando futuros (‘Concurrent::IVar`).
-
Gestionar la escucha de respuestas en la cola especial de RabbitMQ.
Constant Summary collapse
- RETURN_RACE_WINDOW_S =
Bound de espera (segundos) que el publish thread aplica tras un ack positivo para tolerar el scheduling race entre reader thread (donde Bunny invoca ‘on_return`) y publish thread (donde se devuelve `wait_for_confirms`). AMQP garantiza que el `basic.return` precede al `basic.ack` en la wire, así que en la práctica el event ya está seteado al llegar acá; este wait es defensa contra GVL.
0.05
Constants included from Observability
Instance Method Summary collapse
-
#confirmed(request) ⇒ Hash
Envía un mensaje con Publisher Confirms síncronos (Fire-and-Forget confirmado).
-
#fire(request) ⇒ Hash
Envía un mensaje de forma asíncrona (Fire-and-Forget).
-
#initialize(session) ⇒ Producer
constructor
Inicializa el productor.
-
#rpc(request) ⇒ Hash
Envía un mensaje y bloquea el hilo actual esperando una respuesta (RPC).
Methods included from Observability
Constructor Details
#initialize(session) ⇒ Producer
Inicializa el productor.
Prepara las estructuras de concurrencia necesarias para manejar múltiples peticiones RPC simultáneas sobre la misma conexión.
31 32 33 34 35 36 37 38 |
# File 'lib/bug_bunny/producer.rb', line 31 def initialize(session) @session = session @logger = BugBunny.configuration.logger # Mapa thread-safe para correlacionar IDs de petición con sus futuros (IVars) @pending_requests = Concurrent::Map.new @reply_listener_mutex = Mutex.new @reply_listener_started = false end |
Instance Method Details
#confirmed(request) ⇒ Hash
Envía un mensaje con Publisher Confirms síncronos (Fire-and-Forget confirmado).
Publica el mensaje y bloquea el hilo actual hasta que el broker confirme su recepción vía ‘wait_for_confirms`. Soporta `mandatory: true` con callback `on_return` para mensajes que no pudieron rutearse.
A diferencia de #rpc (que espera la respuesta de un Consumer remoto), aquí solo se espera el ACK del propio broker — no hay round-trip al servicio destino.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/bug_bunny/producer.rb', line 72 def confirmed(request) return_listener = nil return_listener = setup_return_listener(request) started_at = monotonic_now publish_duration = (request) confirm_started_at = monotonic_now acked = wait_for_confirms!(request) confirm_duration = duration_s(confirm_started_at) handle_confirm_result(request, acked) handle_return_result(request, return_listener) log_confirmed(request, publish_duration, confirm_duration, started_at) { 'status' => 202, 'body' => nil } rescue BugBunny::Error raise rescue StandardError => e raise BugBunny::CommunicationError, "Publisher confirms failed: #{e.}" ensure teardown_return_listener(request, return_listener) end |
#fire(request) ⇒ Hash
Envía un mensaje de forma asíncrona (Fire-and-Forget).
Serializa el cuerpo del request, resuelve el exchange aplicando la cascada de configuración y publica el mensaje sin esperar respuesta.
47 48 49 50 51 |
# File 'lib/bug_bunny/producer.rb', line 47 def fire(request) (request) # Devolvemos un hash para evitar NoMethodError en el cliente (que espera una respuesta tipo Hash) { 'status' => 202, 'body' => nil } end |
#rpc(request) ⇒ Hash
Envía un mensaje y bloquea el hilo actual esperando una respuesta (RPC).
Implementa el mecanismo “Direct Reply-to” de RabbitMQ (‘amq.rabbitmq.reply-to`).
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/bug_bunny/producer.rb', line 103 def rpc(request) ensure_reply_listener! request.correlation_id ||= SecureRandom.uuid request.reply_to = 'amq.rabbitmq.reply-to' wait_timeout = request.timeout || BugBunny.configuration.rpc_timeout cid = request.correlation_id.to_s # Creamos un futuro (IVar) que actuará como semáforo future = Concurrent::IVar.new @pending_requests[cid] = future started_at = monotonic_now begin fire(request) safe_log(:debug, 'producer.rpc_waiting', messaging_message_id: cid, timeout_s: wait_timeout) # Bloqueamos el hilo aquí hasta que llegue la respuesta o expire el timeout result = future.value(wait_timeout) raise BugBunny::RequestTimeout, "Timeout waiting for RPC: #{request.path} [#{request.method}]" if result.nil? BugBunny.configuration.on_rpc_reply&.call(result[:headers]) log_rpc_response_received(request, cid, result, started_at) parse_response(result[:body]) ensure # Limpieza vital para evitar fugas de memoria en el mapa @pending_requests.delete(cid) end end |