Module: Pgbus::EventBus::Publisher
- Defined in:
- lib/pgbus/event_bus/publisher.rb
Class Method Summary collapse
- .build_event_data(payload, routing_key: nil) ⇒ Object
- .publish(routing_key, payload, headers: nil, delay: 0) ⇒ Object
- .publish_later(routing_key, payload, delay:, headers: nil) ⇒ Object
Class Method Details
.build_event_data(payload, routing_key: nil) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/pgbus/event_bus/publisher.rb', line 40 def build_event_data(payload, routing_key: nil) event_id = SecureRandom.uuid serialized_payload = if payload.respond_to?(:to_global_id) { "_global_id" => payload.to_global_id.to_s } elsif payload.is_a?(Hash) payload else { "value" => payload } end data = { "event_id" => event_id, "payload" => serialized_payload, "published_at" => Time.now.utc.iso8601(6) } data["routing_key"] = routing_key if routing_key data end |
.publish(routing_key, payload, headers: nil, delay: 0) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/pgbus/event_bus/publisher.rb', line 10 def publish(routing_key, payload, headers: nil, delay: 0) event_data = build_event_data(payload, routing_key: routing_key) if defined?(Pgbus::Testing) && !Pgbus::Testing.disabled? event = Pgbus::Event.new( event_id: event_data["event_id"], payload: event_data["payload"], published_at: event_data["published_at"] ? Time.parse(event_data["published_at"]) : nil, routing_key: routing_key, headers: headers ) Pgbus::Testing.store.push_event(event) if Pgbus::Testing.inline? && delay.to_i <= 0 Pgbus::EventBus::Registry.instance.handlers_for(routing_key).each do |subscriber| subscriber.handler_class.new.handle(event) end end return event_data end Pgbus.client.publish_to_topic(routing_key, event_data, headers: headers, delay: delay) end |
.publish_later(routing_key, payload, delay:, headers: nil) ⇒ Object
36 37 38 |
# File 'lib/pgbus/event_bus/publisher.rb', line 36 def publish_later(routing_key, payload, delay:, headers: nil) publish(routing_key, payload, headers: headers, delay: delay) end |