Class: Pgbus::Outbox::Poller
Instance Attribute Summary collapse
Instance Method Summary
collapse
included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals
Constructor Details
#initialize(config: Pgbus.configuration) ⇒ Poller
Returns a new instance of Poller.
10
11
12
13
|
# File 'lib/pgbus/outbox/poller.rb', line 10
def initialize(config: Pgbus.configuration)
@config = config
@shutting_down = false
end
|
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
8
9
10
|
# File 'lib/pgbus/outbox/poller.rb', line 8
def config
@config
end
|
Instance Method Details
#graceful_shutdown ⇒ Object
35
36
37
|
# File 'lib/pgbus/outbox/poller.rb', line 35
def graceful_shutdown
@shutting_down = true
end
|
39
40
41
|
# File 'lib/pgbus/outbox/poller.rb', line 39
def immediate_shutdown
@shutting_down = true
end
|
#poll_and_publish ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/pgbus/outbox/poller.rb', line 43
def poll_and_publish
published = 0
loop do
succeeded = 0
OutboxEntry.transaction do
entries = OutboxEntry.unpublished
.order(:id)
.limit(config.outbox_batch_size)
.lock("FOR UPDATE SKIP LOCKED")
.to_a
break if entries.empty?
succeeded = publish_entries(entries)
published += succeeded
break if succeeded.zero? || entries.size < config.outbox_batch_size
end
break if succeeded.zero?
end
Pgbus.logger.debug { "[Pgbus] Outbox published #{published} entries" } if published.positive?
published
rescue StandardError => e
ErrorReporter.report(e, { action: "outbox_poll" })
0
end
|
#run ⇒ Object
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/pgbus/outbox/poller.rb', line 15
def run
setup_signals
start_heartbeat
Pgbus.logger.info { "[Pgbus] Outbox poller started: interval=#{config.outbox_poll_interval}s" }
loop do
break if @shutting_down
process_signals
break if @shutting_down
poll_and_publish
break if @shutting_down
interruptible_sleep(config.outbox_poll_interval)
end
shutdown
end
|