Class: Vert::Outbox::Publisher
- Inherits:
-
Object
- Object
- Vert::Outbox::Publisher
- Defined in:
- lib/vert/outbox/publisher.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
- #connected? ⇒ Boolean
-
#initialize ⇒ Publisher
constructor
A new instance of Publisher.
- #publish(event) ⇒ Object
- #publish_batch(events) ⇒ Object
Constructor Details
#initialize ⇒ Publisher
Returns a new instance of Publisher.
10 11 12 13 14 |
# File 'lib/vert/outbox/publisher.rb', line 10 def initialize @connection = nil @channel = nil @exchange = nil end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
8 9 10 |
# File 'lib/vert/outbox/publisher.rb', line 8 def channel @channel end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
8 9 10 |
# File 'lib/vert/outbox/publisher.rb', line 8 def connection @connection end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
8 9 10 |
# File 'lib/vert/outbox/publisher.rb', line 8 def exchange @exchange end |
Class Method Details
.publish_pending(batch_size: 100) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/vert/outbox/publisher.rb', line 69 def publish_pending(batch_size: 100) return { published: 0, failed: 0, error: "OutboxEvent not defined" } unless outbox_event_class results = { published: 0, failed: 0 } with_connection do |publisher| outbox_event_class.publishable.find_in_batches(batch_size: batch_size) do |events| batch_results = publisher.publish_batch(events) results[:published] += batch_results[:published] results[:failed] += batch_results[:failed] end end results end |
.with_connection ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/vert/outbox/publisher.rb', line 61 def with_connection publisher = new publisher.connect yield publisher ensure publisher&.close end |
Instance Method Details
#close ⇒ Object
24 25 26 27 28 29 |
# File 'lib/vert/outbox/publisher.rb', line 24 def close @connection&.close @connection = nil @channel = nil @exchange = nil end |
#connect ⇒ Object
16 17 18 19 20 21 22 |
# File 'lib/vert/outbox/publisher.rb', line 16 def connect @connection = Bunny.new(Vert.config.rabbitmq_url) @connection.start @channel = @connection.create_channel @exchange = @channel.topic(Vert.config.exchange_name, durable: true) self end |
#connected? ⇒ Boolean
31 32 33 |
# File 'lib/vert/outbox/publisher.rb', line 31 def connected? @connection&.open? && @channel&.open? end |
#publish(event) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/vert/outbox/publisher.rb', line 35 def publish(event) connect unless connected? = (event) @exchange.publish( .to_json, routing_key: event.routing_key, persistent: true, content_type: "application/json", message_id: event.id.to_s, timestamp: event.created_at.to_i, headers: event. ) event.mark_as_published! true rescue StandardError => e event.mark_as_failed!(e) false end |
#publish_batch(events) ⇒ Object
54 55 56 57 58 |
# File 'lib/vert/outbox/publisher.rb', line 54 def publish_batch(events) results = { published: 0, failed: 0 } events.each { |event| publish(event) ? results[:published] += 1 : results[:failed] += 1 } results end |