Class: Pgbus::Process::Consumer
- Inherits:
-
Object
- Object
- Pgbus::Process::Consumer
- Includes:
- SignalHandler
- Defined in:
- lib/pgbus/process/consumer.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#execution_mode ⇒ Object
readonly
Returns the value of attribute execution_mode.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
-
#topics ⇒ Object
readonly
Returns the value of attribute topics.
Instance Method Summary collapse
- #graceful_shutdown ⇒ Object
- #immediate_shutdown ⇒ Object
-
#initialize(topics:, threads: 3, config: Pgbus.configuration, execution_mode: :threads) ⇒ Consumer
constructor
A new instance of Consumer.
- #run ⇒ Object
Methods included from SignalHandler
included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals
Constructor Details
#initialize(topics:, threads: 3, config: Pgbus.configuration, execution_mode: :threads) ⇒ Consumer
Returns a new instance of Consumer.
12 13 14 15 16 17 18 19 20 |
# File 'lib/pgbus/process/consumer.rb', line 12 def initialize(topics:, threads: 3, config: Pgbus.configuration, execution_mode: :threads) @topics = Array(topics) @threads = threads @config = config @execution_mode = ExecutionPools.normalize_mode(execution_mode) @shutting_down = false @pool = ExecutionPools.build(mode: @execution_mode, capacity: threads) @registry = EventBus::Registry.instance end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
10 11 12 |
# File 'lib/pgbus/process/consumer.rb', line 10 def config @config end |
#execution_mode ⇒ Object (readonly)
Returns the value of attribute execution_mode.
10 11 12 |
# File 'lib/pgbus/process/consumer.rb', line 10 def execution_mode @execution_mode end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
10 11 12 |
# File 'lib/pgbus/process/consumer.rb', line 10 def threads @threads end |
#topics ⇒ Object (readonly)
Returns the value of attribute topics.
10 11 12 |
# File 'lib/pgbus/process/consumer.rb', line 10 def topics @topics end |
Instance Method Details
#graceful_shutdown ⇒ Object
38 39 40 |
# File 'lib/pgbus/process/consumer.rb', line 38 def graceful_shutdown @shutting_down = true end |
#immediate_shutdown ⇒ Object
42 43 44 45 |
# File 'lib/pgbus/process/consumer.rb', line 42 def immediate_shutdown @shutting_down = true @pool.kill end |
#run ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/pgbus/process/consumer.rb', line 22 def run setup_signals start_heartbeat setup_subscriptions Pgbus.logger.info { "[Pgbus] Consumer started: topics=#{topics.join(",")} threads=#{threads}" } loop do break if @shutting_down process_signals consume end shutdown end |