Module: Pgbus::EventBus::Publisher

Defined in:
lib/pgbus/event_bus/publisher.rb

Class Method Summary collapse

Class Method Details

.build_event_data(payload, routing_key: nil) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pgbus/event_bus/publisher.rb', line 40

def build_event_data(payload, routing_key: nil)
  event_id = SecureRandom.uuid

  serialized_payload = if payload.respond_to?(:to_global_id)
                         { "_global_id" => payload.to_global_id.to_s }
                       elsif payload.is_a?(Hash)
                         payload
                       else
                         { "value" => payload }
                       end

  data = {
    "event_id" => event_id,
    "payload" => serialized_payload,
    "published_at" => Time.now.utc.iso8601(6)
  }
  data["routing_key"] = routing_key if routing_key
  data
end

.publish(routing_key, payload, headers: nil, delay: 0) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/pgbus/event_bus/publisher.rb', line 10

def publish(routing_key, payload, headers: nil, delay: 0)
  event_data = build_event_data(payload, routing_key: routing_key)

  if defined?(Pgbus::Testing) && !Pgbus::Testing.disabled?
    event = Pgbus::Event.new(
      event_id: event_data["event_id"],
      payload: event_data["payload"],
      published_at: event_data["published_at"] ? Time.parse(event_data["published_at"]) : nil,
      routing_key: routing_key,
      headers: headers
    )

    Pgbus::Testing.store.push_event(event)

    if Pgbus::Testing.inline? && delay.to_i <= 0
      Pgbus::EventBus::Registry.instance.handlers_for(routing_key).each do |subscriber|
        subscriber.handler_class.new.handle(event)
      end
    end

    return event_data
  end

  Pgbus.client.publish_to_topic(routing_key, event_data, headers: headers, delay: delay)
end

.publish_later(routing_key, payload, delay:, headers: nil) ⇒ Object



36
37
38
# File 'lib/pgbus/event_bus/publisher.rb', line 36

def publish_later(routing_key, payload, delay:, headers: nil)
  publish(routing_key, payload, headers: headers, delay: delay)
end