Class: Pgbus::Outbox::Poller

Inherits:
Object
  • Object
show all
Includes:
Process::SignalHandler
Defined in:
lib/pgbus/outbox/poller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Process::SignalHandler

included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals

Constructor Details

#initialize(config: Pgbus.configuration) ⇒ Poller

Returns a new instance of Poller.



10
11
12
13
# File 'lib/pgbus/outbox/poller.rb', line 10

def initialize(config: Pgbus.configuration)
  @config = config
  @shutting_down = false
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



8
9
10
# File 'lib/pgbus/outbox/poller.rb', line 8

def config
  @config
end

Instance Method Details

#graceful_shutdownObject



35
36
37
# File 'lib/pgbus/outbox/poller.rb', line 35

def graceful_shutdown
  @shutting_down = true
end

#immediate_shutdownObject



39
40
41
# File 'lib/pgbus/outbox/poller.rb', line 39

def immediate_shutdown
  @shutting_down = true
end

#poll_and_publishObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/pgbus/outbox/poller.rb', line 43

def poll_and_publish
  published = 0

  loop do
    succeeded = 0

    OutboxEntry.transaction do
      entries = OutboxEntry.unpublished
                           .order(:id)
                           .limit(config.outbox_batch_size)
                           .lock("FOR UPDATE SKIP LOCKED")
                           .to_a
      break if entries.empty?

      succeeded = publish_entries(entries)
      published += succeeded
      break if succeeded.zero? || entries.size < config.outbox_batch_size
    end

    break if succeeded.zero?
  end

  Pgbus.logger.debug { "[Pgbus] Outbox published #{published} entries" } if published.positive?
  published
rescue StandardError => e
  ErrorReporter.report(e, { action: "outbox_poll" })
  0
end

#runObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/pgbus/outbox/poller.rb', line 15

def run
  setup_signals
  start_heartbeat
  Pgbus.logger.info { "[Pgbus] Outbox poller started: interval=#{config.outbox_poll_interval}s" }

  loop do
    break if @shutting_down

    process_signals
    break if @shutting_down

    poll_and_publish
    break if @shutting_down

    interruptible_sleep(config.outbox_poll_interval)
  end

  shutdown
end